From 191402891813f1e55a79dcc5e5ddc3779063ad3e Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Thu, 30 Nov 2023 10:27:47 +0200 Subject: [PATCH 01/15] feat(ingest/powerbi): add dax based dataset profiling --- .../ingestion/source/powerbi/config.py | 18 + .../ingestion/source/powerbi/powerbi.py | 55 +++ .../powerbi/rest_api_wrapper/data_classes.py | 10 + .../powerbi/rest_api_wrapper/powerbi_api.py | 66 ++-- .../rest_api_wrapper/powerbi_profiler.py | 219 +++++++++++ .../powerbi/golden_test_profiling.json | 152 ++++++++ .../integration/powerbi/test_profiling.py | 364 ++++++++++++++++++ 7 files changed, 857 insertions(+), 27 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py create mode 100644 metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json create mode 100644 metadata-ingestion/tests/integration/powerbi/test_profiling.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index b8cc34c234ffa4..1bb1c8c0e190b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -60,9 +60,12 @@ class Constant: STATUS = "status" CHART_ID = "powerbi.linkedin.com/charts/{}" CHART_KEY = "chartKey" + COLUMN_TYPE = "columnType" + DATA_TYPE = "dataType" DASHBOARD = "dashboard" DASHBOARDS = "dashboards" DASHBOARD_KEY = "dashboardKey" + DESCRIPTION = "description" OWNERSHIP = "ownership" BROWSERPATH = "browsePaths" DASHBOARD_INFO = "dashboardInfo" @@ -108,6 +111,7 @@ class Constant: TABLES = "tables" EXPRESSION = "expression" SOURCE = "source" + SCHEMA_METADATA = "schemaMetadata" PLATFORM_NAME = "powerbi" REPORT_TYPE_NAME = BIAssetSubTypes.REPORT CHART_COUNT = "chartCount" @@ -228,6 +232,13 @@ class OwnershipMapping(ConfigModel): ) +class PowerBiProfilingConfig(ConfigModel): + enabled: bool = pydantic.Field( + default=False, + description="Whether profiling should be done.", + ) + + class PowerBiDashboardSourceConfig( StatefulIngestionConfigBase, DatasetSourceConfigMixin ): @@ -405,6 +416,13 @@ class PowerBiDashboardSourceConfig( "Works for M-Query where native SQL is used for transformation.", ) + profile_pattern: AllowDenyPattern = pydantic.Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns to filter tables for profiling during ingestion. Note that only tables " + "allowed by the `table_pattern` will be considered. Matched format is 'datasetname.tablename'", + ) + profiling: PowerBiProfilingConfig = PowerBiProfilingConfig() + @root_validator(skip_on_failure=True) def validate_extract_column_level_lineage(cls, values: Dict) -> Dict: flags = [ diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index dc4394efcf245b..bf73a554820409 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -43,6 +43,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) +from datahub.metadata._schema_classes import DatasetProfileClass from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( FineGrainedLineage, @@ -58,6 +59,7 @@ CorpUserKeyClass, DashboardInfoClass, DashboardKeyClass, + DatasetFieldProfileClass, DatasetLineageTypeClass, DatasetPropertiesClass, GlobalTagsClass, @@ -473,9 +475,62 @@ def to_datahub_dataset( Constant.DATASET, dataset.tags, ) + self.extract_profile(dataset_mcps, workspace, dataset, table, ds_urn) return dataset_mcps + def extract_profile( + self, + dataset_mcps: List[MetadataChangeProposalWrapper], + workspace: powerbi_data_classes.Workspace, + dataset: powerbi_data_classes.PowerBIDataset, + table: powerbi_data_classes.Table, + ds_urn: str, + ) -> None: + if not self.__config.profiling.enabled: + # Profiling not enabled + return + + if not self.__config.profile_pattern.allowed( + f"{workspace.name}.{dataset.name}.{table.name}" + ): + logger.info( + f"Table {table.name} in {dataset.name}, not allowed for profiling" + ) + return + logger.info(f"Profiling table: {table.name}") + + profile = DatasetProfileClass(timestampMillis=builder.get_sys_time()) + profile.rowCount = table.row_count + profile.fieldProfiles = [] + + columns: List[ + Union[powerbi_data_classes.Column, powerbi_data_classes.Measure] + ] = [*(table.columns or []), *(table.measures or [])] + for column in columns: + allowed_column = self.__config.profile_pattern.allowed( + f"{workspace.name}.{dataset.name}.{table.name}.{column.name}" + ) + if column.isHidden or not allowed_column: + logger.info(f"Column {column.name} not allowed for profiling") + continue + field_profile = DatasetFieldProfileClass(column.name or "") + field_profile.sampleValues = column.sample_values + field_profile.min = column.min + field_profile.max = column.max + field_profile.uniqueCount = column.unique_count + profile.fieldProfiles.append(field_profile) + + profile.columnCount = table.column_count + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=ds_urn, + aspectName="datasetProfile", + aspect=profile, + ) + dataset_mcps.append(mcp) + @staticmethod def transform_tags(tags: List[str]) -> GlobalTagsClass: return GlobalTagsClass( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index 0d41ab00c66f5e..a97b8b56825aa3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -96,6 +96,10 @@ class Column: columnType: Optional[str] = None expression: Optional[str] = None description: Optional[str] = None + min: Optional[str] = None + max: Optional[str] = None + unique_count: Optional[int] = None + sample_values: Optional[List[str]] = None @dataclass @@ -108,6 +112,10 @@ class Measure: BooleanTypeClass, DateTypeClass, NullTypeClass, NumberTypeClass, StringTypeClass ] = dataclasses.field(default_factory=NullTypeClass) description: Optional[str] = None + min: Optional[str] = None + max: Optional[str] = None + unique_count: Optional[int] = None + sample_values: Optional[List[str]] = None @dataclass @@ -117,6 +125,8 @@ class Table: expression: Optional[str] = None columns: Optional[List[Column]] = None measures: Optional[List[Measure]] = None + row_count: Optional[int] = None + column_count: Optional[int] = None # Pointer to the parent dataset. dataset: Optional["PowerBIDataset"] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index 304e999f81a843..dad3bd76aab79a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -26,6 +26,9 @@ AdminAPIResolver, RegularAPIResolver, ) +from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_profiler import ( + PowerBiDatasetProfilingResolver, +) # Logger instance logger = logging.getLogger(__name__) @@ -47,6 +50,13 @@ def __init__(self, config: PowerBiDashboardSourceConfig) -> None: tenant_id=self.__config.tenant_id, ) + self.__profiling_resolver = PowerBiDatasetProfilingResolver( + client_id=self.__config.client_id, + client_secret=self.__config.client_secret, + tenant_id=self.__config.tenant_id, + config=self.__config, + ) + def log_http_error(self, message: str) -> Any: logger.warning(message) _, e, _ = sys.exc_info() @@ -286,11 +296,12 @@ def _parse_endorsement(endorsements: Optional[dict]) -> List[str]: return [endorsement] - def _get_workspace_datasets(self, scan_result: Optional[dict]) -> dict: + def _get_workspace_datasets(self, workspace: Workspace) -> dict: """ Filter out "dataset" from scan_result and return Dataset instance set """ dataset_map: dict = {} + scan_result = workspace.scan_result if scan_result is None: return dataset_map @@ -344,30 +355,33 @@ def _get_workspace_datasets(self, scan_result: Optional[dict]) -> dict: and len(table[Constant.SOURCE]) > 0 else None ) - dataset_instance.tables.append( - Table( - name=table[Constant.NAME], - full_name="{}.{}".format( - dataset_name.replace(" ", "_"), - table[Constant.NAME].replace(" ", "_"), - ), - expression=expression, - columns=[ - Column( - **column, - datahubDataType=FIELD_TYPE_MAPPING.get( - column["dataType"], FIELD_TYPE_MAPPING["Null"] - ), - ) - for column in table.get("columns", []) - ], - measures=[ - Measure(**measure) for measure in table.get("measures", []) - ], - dataset=dataset_instance, - ) + table = Table( + name=table[Constant.NAME], + full_name="{}.{}".format( + dataset_name.replace(" ", "_"), + table[Constant.NAME].replace(" ", "_"), + ), + expression=expression, + columns=[ + Column( + **column, + datahubDataType=FIELD_TYPE_MAPPING.get( + column["dataType"], FIELD_TYPE_MAPPING["Null"] + ), + ) + for column in table.get("columns", []) + ], + measures=[ + Measure(**measure) for measure in table.get("measures", []) + ], + dataset=dataset_instance, + row_count=None, + column_count=None, ) - + self.__profiling_resolver.profile_dataset( + dataset_instance, table, workspace.name + ) + dataset_instance.tables.append(table) return dataset_map def _fill_metadata_from_scan_result( @@ -392,9 +406,7 @@ def _fill_metadata_from_scan_result( independent_datasets=[], ) cur_workspace.scan_result = workspace_metadata - cur_workspace.datasets = self._get_workspace_datasets( - cur_workspace.scan_result - ) + cur_workspace.datasets = self._get_workspace_datasets(cur_workspace) # Fetch endorsements tag if it is enabled from configuration if self.__config.extract_endorsements_to_tags: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py new file mode 100644 index 00000000000000..573c39fe5c138e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py @@ -0,0 +1,219 @@ +import logging +import re +from typing import Dict, List, Union + +import requests + +from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceConfig +from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import ( + Column, + Measure, + PowerBIDataset, + Table, +) +from datahub.ingestion.source.powerbi.rest_api_wrapper.data_resolver import ( + RegularAPIResolver, +) + + +class ProfilerConstant: + DATASET_EXECUTE_QUERIES_POST = "DATASET_EXECUTE_QUERIES_POST" + + +ENDPOINT = { + ProfilerConstant.DATASET_EXECUTE_QUERIES_POST: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/datasets/{DATASET_ID}/executeQueries", +} + + +logger = logging.getLogger(__name__) + + +def get_column_name(table_and_col: str) -> str: + regex = re.compile(".*\\[(.*)\\]$") + m = regex.match(table_and_col) + if m: + return m.group(1) + return "" + + +def process_sample_result(result_data: dict) -> dict: + sample_data_by_column: Dict[str, List[str]] = {} + rows = result_data["results"][0]["tables"][0]["rows"] + for sample in rows: + for key, value in sample.items(): + if not value: + continue + column_name = get_column_name(key) + if column_name not in sample_data_by_column: + sample_data_by_column[column_name] = [] + sample_data_by_column[column_name].append(str(value)) + return sample_data_by_column + + +def process_column_result(result_data: dict) -> dict: + sample_data_by_column: Dict[str, str] = {} + rows = result_data["results"][0]["tables"][0]["rows"] + for sample in rows: + for key, value in sample.items(): + if not value: + continue + column_name = get_column_name(key) + if column_name != "unique_count": + value = str(value) + sample_data_by_column[column_name] = value + return sample_data_by_column + + +class PowerBiDatasetProfilingResolver(RegularAPIResolver): + config: PowerBiDashboardSourceConfig + overview_stats: dict + + def __init__( + self, + client_id: str, + client_secret: str, + tenant_id: str, + config: PowerBiDashboardSourceConfig, + ): + super(PowerBiDatasetProfilingResolver, self).__init__( + client_id, client_secret, tenant_id + ) + self.config = config + self.overview_stats = {} + + def get_row_count(self, dataset: PowerBIDataset, table: Table) -> int: + query = f""" + EVALUATE ROW("count", COUNTROWS ( '{table.name}' )) + """ + try: + data = self.execute_query(dataset, query) + rows = data["results"][0]["tables"][0]["rows"] + count = rows[0]["[count]"] + return count + except requests.exceptions.RequestException as ex: + logger.warning(getattr(ex.response, "text", "")) + logger.warning( + f"Profiling failed for getting row count for dataset {dataset.id}, with status code {getattr(ex.response, 'status_code', None)}", + ) + except (KeyError, IndexError) as ex: + logger.warning( + f"Profiling failed for getting row count for dataset {dataset.id}, with {ex}" + ) + return 0 + + def get_data_sample(self, dataset: PowerBIDataset, table: Table) -> dict: + try: + query = f"EVALUATE TOPN(3, '{table.name}')" + data = self.execute_query(dataset, query) + return process_sample_result(data) + except requests.exceptions.RequestException as ex: + logger.warning(getattr(ex.response, "text", "")) + logger.warning( + f"Getting sample with TopN failed for dataset {dataset.id}, with status code {getattr(ex.response, 'status_code', None)}", + ) + except (KeyError, IndexError) as ex: + logger.warning( + f"Getting sample with TopN failed for dataset {dataset.id}, with {ex}" + ) + + return {} + + def get_column_data( + self, dataset: PowerBIDataset, table: Table, column: Union[Column, Measure] + ) -> dict: + try: + logger.info(f"Column data query for {dataset.name}, {column.name}") + query = f""" + EVALUATE ROW( + "min", MIN('{table.name}'[{column.name}]), + "max", MAX('{table.name}'[{column.name}]), + "unique_count", COUNTROWS ( DISTINCT ( '{table.name}'[{column.name}] ) ) + )""" + data = self.execute_query(dataset, query) + return process_column_result(data) + except requests.exceptions.RequestException as ex: + logger.warning(getattr(ex.response, "text", "")) + logger.warning( + f"Getting column statistics failed for dataset {dataset.name}, {column.name}, with status code {getattr(ex.response, 'status_code', None)}", + ) + except (KeyError, IndexError) as ex: + logger.warning( + f"Getting column statistics failed for dataset {dataset.name}, {column.name}, with {ex}" + ) + + return {} + + def execute_query(self, dataset: PowerBIDataset, query: str) -> dict: + dataset_query_endpoint: str = ENDPOINT[ + ProfilerConstant.DATASET_EXECUTE_QUERIES_POST + ] + # Replace place holders + dataset_query_endpoint = dataset_query_endpoint.format( + POWERBI_BASE_URL=self.BASE_URL, + WORKSPACE_ID=dataset.workspace_id, + DATASET_ID=dataset.id, + ) + # Hit PowerBi + logger.info(f"Request to query endpoint URL={dataset_query_endpoint}") + payload = { + "queries": [ + { + "query": query, + } + ], + "serializerSettings": { + "includeNulls": True, + }, + } + response = self._request_session.post( + dataset_query_endpoint, + json=payload, + headers=self.get_authorization_header(), + ) + response.raise_for_status() + return response.json() + + def profile_dataset( + self, + dataset: PowerBIDataset, + table: Table, + workspace_name: str, + ) -> None: + if not self.config.profiling.enabled: + # Profiling not enabled + return + + if not self.config.profile_pattern.allowed( + f"{workspace_name}.{dataset.name}.{table.name}" + ): + logger.info( + f"Table {table.name} in {dataset.name}, not allowed for profiling" + ) + return + + logger.info(f"Profiling table: {table.name}") + row_count = self.get_row_count(dataset, table) + sample = self.get_data_sample(dataset, table) + + table.row_count = row_count + column_count = 0 + + columns: List[Union[Column, Measure]] = [ + *(table.columns or []), + *(table.measures or []), + ] + for column in columns: + if column.isHidden: + continue + + if sample and sample.get(column.name, None): + column.sample_values = sample.get(column.name, None) + + column_stats = self.get_column_data(dataset, table, column) + + for key, value in column_stats.items(): + setattr(column, key, value) + + column_count += 1 + + table.column_count = column_count diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json b/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json new file mode 100644 index 00000000000000..9a57a01b914552 --- /dev/null +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json @@ -0,0 +1,152 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "let\n Source = PostgreSQL.Database(\"localhost\" , \"mics\" ),\n public_order_date = Source{[Schema=\"public\",Item=\"order_date\"]}[Data] \n in \n public_order_date", + "viewLanguage": "m_query" + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445" + }, + "externalUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445/details", + "name": "articles", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "PowerBI Dataset Table", + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1645599600000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 542300, + "columnCount": 4, + "fieldProfiles": [ + { + "fieldPath": "link", + "sampleValues": [ + "http://example.org", + "http://example.org/111/22/foo", + "http://example.org/111/22" + ] + }, + { + "fieldPath": "description", + "sampleValues": [ + "this is a sample", + "this describes content", + "sample, this is" + ] + }, + { + "fieldPath": "topic", + "sampleValues": [ + "urgent matters", + "urgent matters", + "normal matters" + ] + }, + { + "fieldPath": "view_count", + "sampleValues": [ + "123455", + "123455", + "123455" + ] + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/powerbi/test_profiling.py b/metadata-ingestion/tests/integration/powerbi/test_profiling.py new file mode 100644 index 00000000000000..6bc18e8551bcf9 --- /dev/null +++ b/metadata-ingestion/tests/integration/powerbi/test_profiling.py @@ -0,0 +1,364 @@ +import logging +import sys +from typing import Any, Dict +from unittest import mock + +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers + +FROZEN_TIME = "2022-02-23 07:00:00" + + +def scan_init_response(request, context): + # Request mock is passing POST input in the form of workspaces= + workspace_id = request.text.split("=")[1] + + w_id_vs_response: Dict[str, Any] = { + "64ED5CAD-7C10-4684-8180-826122881108": { + "id": "4674efd1-603c-4129-8d82-03cf2be05aff" + } + } + + return w_id_vs_response[workspace_id] + + +def admin_datasets_response(request, context): + return { + "value": [ + { + "id": "05169CD2-E713-41E6-9600-1D8066D95445", + "name": "library-dataset", + "webUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445", + } + ] + } + + +def execute_queries_response(request, context): + query = request.json()["queries"][0]["query"] + if "COLUMNSTATISTICS" in query: + return { + "results": [ + { + "tables": [ + { + "rows": [ + { + "[Table Name]": "articles", + "[Column Name]": "link", + "[Min]": 0, + "[Max]": 1, + "[Cardinality]": 2, + "[Max Length]": None, + }, + { + "[Table Name]": "articles", + "[Column Name]": "description", + "[Min]": "0", + "[Max]": "1", + "[Cardinality]": 2, + "[Max Length]": 1, + }, + { + "[Table Name]": "articles", + "[Column Name]": "RowNumber-aabb11", + "[Min]": "0", + "[Max]": "1", + "[Cardinality]": 2, + "[Max Length]": 1, + }, + { + "[Table Name]": "articles", + "[Column Name]": "topic", + "[Min]": 0, + "[Max]": 1, + "[Cardinality]": 2, + "[Max Length]": None, + }, + { + "[Table Name]": "articles", + "[Column Name]": "view_count", + "[Min]": 0, + "[Max]": 9993334, + "[Cardinality]": 23444, + "[Max Length]": None, + }, + ] + } + ] + } + ], + } + elif "COUNTROWS" in query: + return { + "results": [ + { + "tables": [ + { + "rows": [ + { + "[count]": 542300, + }, + ] + } + ] + } + ], + } + elif "TOPN" in query: + return { + "results": [ + { + "tables": [ + { + "rows": [ + { + "[link]": "http://example.org", + "[description]": "this is a sample", + "[topic]": "urgent matters", + "[view_count]": 123455, + }, + { + "[link]": "http://example.org/111/22/foo", + "[description]": "this describes content", + "[topic]": "urgent matters", + "[view_count]": 123455, + }, + { + "[link]": "http://example.org/111/22", + "[description]": "sample, this is", + "[topic]": "normal matters", + "[view_count]": 123455, + }, + ] + } + ] + } + ], + } + + +def register_mock_admin_api(request_mock: Any, override_data: dict = {}) -> None: + api_vs_response = { + "https://api.powerbi.com/v1.0/myorg/admin/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets": { + "method": "GET", + "status_code": 200, + "json": admin_datasets_response, + }, + "https://api.powerbi.com/v1.0/myorg/admin/groups": { + "method": "GET", + "status_code": 200, + "json": { + "@odata.count": 3, + "value": [ + { + "id": "64ED5CAD-7C10-4684-8180-826122881108", + "isReadOnly": True, + "name": "demo-workspace", + "type": "Workspace", + } + ], + }, + }, + "https://api.powerbi.com/v1.0/myorg/admin/groups/64ED5CAD-7C10-4684-8180-826122881108/dashboards": { + "method": "GET", + "status_code": 200, + "json": {"value": []}, + }, + "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445/datasources": { + "method": "GET", + "status_code": 200, + "json": { + "value": [ + { + "datasourceId": "DCE90B40-84D6-467A-9A5C-648E830E72D3", + "datasourceType": "PostgreSql", + "connectionDetails": { + "database": "library_db", + "server": "foo", + }, + }, + ] + }, + }, + "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445/executeQueries": { + "method": "POST", + "status_code": 200, + "json": execute_queries_response, + }, + "https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanStatus/4674efd1-603c-4129-8d82-03cf2be05aff": { + "method": "GET", + "status_code": 200, + "json": { + "status": "SUCCEEDED", + }, + }, + "https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanStatus/a674efd1-603c-4129-8d82-03cf2be05aff": { + "method": "GET", + "status_code": 200, + "json": { + "status": "SUCCEEDED", + }, + }, + "https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanResult/4674efd1-603c-4129-8d82-03cf2be05aff": { + "method": "GET", + "status_code": 200, + "json": { + "workspaces": [ + { + "id": "64ED5CAD-7C10-4684-8180-826122881108", + "name": "demo-workspace", + "state": "Active", + "datasets": [ + { + "id": "05169CD2-E713-41E6-9600-1D8066D95445", + "endorsementDetails": {"endorsement": "Promoted"}, + "name": "test_sf_pbi_test", + "tables": [ + { + "name": "articles", + "source": [ + { + "expression": 'let\n Source = PostgreSQL.Database("localhost" , "mics" ),\n public_order_date = Source{[Schema="public",Item="order_date"]}[Data] \n in \n public_order_date', + } + ], + "datasourceUsages": [ + { + "datasourceInstanceId": "DCE90B40-84D6-467A-9A5C-648E830E72D3", + } + ], + "columns": [ + { + "name": "link", + "description": "column description", + "dataType": "String", + "columnType": "DATA", + "isHidden": False, + }, + { + "name": "description", + "description": "column description", + "dataType": "String", + "columnType": "DATA", + "isHidden": False, + }, + { + "name": "topic", + "description": "column description", + "dataType": "String", + "columnType": "DATA", + "isHidden": False, + }, + ], + "measures": [ + { + "name": "view_count", + "description": "column description", + "expression": "let\n x", + "isHidden": False, + } + ], + }, + ], + }, + ], + "dashboards": [], + "reports": [], + }, + ] + }, + }, + "https://api.powerbi.com/v1.0/myorg/admin/workspaces/getInfo": { + "method": "POST", + "status_code": 200, + "json": scan_init_response, + }, + } + + api_vs_response.update(override_data) + + for url in api_vs_response.keys(): + request_mock.register_uri( + api_vs_response[url]["method"], + url, + json=api_vs_response[url]["json"], + status_code=api_vs_response[url]["status_code"], + ) + + +def enable_logging(): + # set logging to console + logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) + logging.getLogger().setLevel(logging.DEBUG) + + +def mock_msal_cca(*args, **kwargs): + class MsalClient: + def acquire_token_for_client(self, *args, **kwargs): + return { + "access_token": "dummy", + } + + return MsalClient() + + +def default_source_config(): + return { + "client_id": "foo", + "client_secret": "bar", + "tenant_id": "0B0C960B-FCDF-4D0F-8C45-2E03BB59DDEB", + "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", + "extract_lineage": True, + "extract_reports": False, + "admin_apis_only": True, + "extract_ownership": True, + "convert_lineage_urns_to_lowercase": False, + "extract_independent_datasets": True, + "workspace_id_pattern": {"allow": ["64ED5CAD-7C10-4684-8180-826122881108"]}, + "env": "DEV", + "extract_workspaces_to_containers": False, + "profiling": { + "enabled": True, + }, + "profile_pattern": {"allow": [".*"]}, + } + + +@freeze_time(FROZEN_TIME) +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +def test_profiling(mock_msal, pytestconfig, tmp_path, mock_time, requests_mock): + enable_logging() + + test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" + + register_mock_admin_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + **default_source_config(), + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_profiling.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "golden_test_profiling.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/powerbi_profiling.json", + golden_path=f"{test_resources_dir}/{golden_file}", + ) From 6c82b509b72ff994900b9a7f37c54b1e93862026 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Thu, 30 Nov 2023 13:32:44 +0200 Subject: [PATCH 02/15] docs(ingest/powerbi): add some documentation about profiling --- metadata-ingestion/docs/sources/powerbi/powerbi_pre.md | 4 ++++ .../src/datahub/ingestion/source/powerbi/config.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md index fcfae6cd1e6d78..e074401828cea4 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md @@ -107,6 +107,10 @@ By default, extracting endorsement information to tags is disabled. The feature Please note that the default implementation overwrites tags for the ingested entities, if you need to preserve existing tags, consider using a [transformer](../../../../metadata-ingestion/docs/transformer/dataset_transformer.md#simple-add-dataset-globaltags) with `semantics: PATCH` tags instead of `OVERWRITE`. +## Profiling + +The profiling implementation is done through querying [DAX query endpoint](https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/execute-queries). Therefore the principal needs to have permission to query the datasets to be profiled. Profiling is done with column based queries to be able to handle wide datasets without timeouts. + ## Admin Ingestion vs. Basic Ingestion PowerBI provides two sets of API i.e. [Basic API and Admin API](https://learn.microsoft.com/en-us/rest/api/power-bi/). diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 1bb1c8c0e190b3..184bcb6d6612f3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -235,7 +235,7 @@ class OwnershipMapping(ConfigModel): class PowerBiProfilingConfig(ConfigModel): enabled: bool = pydantic.Field( default=False, - description="Whether profiling should be done.", + description="Whether profiling of PowerBI datasets should be done", ) From 7f6d96f9a24d1cc2616cd5e4c8d5fa340698d3e8 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Thu, 11 Jan 2024 16:46:57 +0200 Subject: [PATCH 03/15] fix: move profiling data to a separate dataclass --- .../powerbi/rest_api_wrapper/data_classes.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index a97b8b56825aa3..3dea2c8ceaed05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -85,6 +85,14 @@ def __hash__(self): return hash(self.__members()) +@dataclass +class MeasureProfile: + min: Optional[str] = None + max: Optional[str] = None + unique_count: Optional[int] = None + sample_values: Optional[List[str]] = None + + @dataclass class Column: name: str @@ -96,10 +104,7 @@ class Column: columnType: Optional[str] = None expression: Optional[str] = None description: Optional[str] = None - min: Optional[str] = None - max: Optional[str] = None - unique_count: Optional[int] = None - sample_values: Optional[List[str]] = None + measure_profile: Optional[MeasureProfile] = None @dataclass @@ -112,10 +117,7 @@ class Measure: BooleanTypeClass, DateTypeClass, NullTypeClass, NumberTypeClass, StringTypeClass ] = dataclasses.field(default_factory=NullTypeClass) description: Optional[str] = None - min: Optional[str] = None - max: Optional[str] = None - unique_count: Optional[int] = None - sample_values: Optional[List[str]] = None + measure_profile: Optional[MeasureProfile] = None @dataclass From 4fadf1f3243f7e3e220cb1f9459772a7c1855289 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Fri, 12 Jan 2024 17:42:52 +0200 Subject: [PATCH 04/15] refactor: powerbi profiling to existing resolver --- .../ingestion/source/powerbi/config.py | 1 + .../ingestion/source/powerbi/powerbi.py | 14 +- .../powerbi/rest_api_wrapper/data_resolver.py | 161 ++++++++++++- .../powerbi/rest_api_wrapper/powerbi_api.py | 20 +- .../rest_api_wrapper/powerbi_profiler.py | 219 ------------------ .../rest_api_wrapper/profiling_utils.py | 38 +++ .../source/powerbi/rest_api_wrapper/query.py | 17 ++ .../powerbi/golden_test_profiling.json | 146 +++++++----- .../integration/powerbi/test_profiling.py | 62 ++--- 9 files changed, 330 insertions(+), 348 deletions(-) delete mode 100644 metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/query.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 184bcb6d6612f3..1772a924ca9fb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -46,6 +46,7 @@ class Constant: Authorization = "Authorization" WORKSPACE_ID = "workspaceId" DASHBOARD_ID = "powerbi.linkedin.com/dashboards/{}" + DATASET_EXECUTE_QUERIES = "DATASET_EXECUTE_QUERIES_POST" DATASET_ID = "datasetId" REPORT_ID = "reportId" SCAN_ID = "ScanId" diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index bf73a554820409..c68218b93da1b6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -514,12 +514,14 @@ def extract_profile( if column.isHidden or not allowed_column: logger.info(f"Column {column.name} not allowed for profiling") continue - field_profile = DatasetFieldProfileClass(column.name or "") - field_profile.sampleValues = column.sample_values - field_profile.min = column.min - field_profile.max = column.max - field_profile.uniqueCount = column.unique_count - profile.fieldProfiles.append(field_profile) + measure_profile = column.measure_profile + if measure_profile: + field_profile = DatasetFieldProfileClass(column.name or "") + field_profile.sampleValues = measure_profile.sample_values + field_profile.min = measure_profile.min + field_profile.max = measure_profile.max + field_profile.uniqueCount = measure_profile.unique_count + profile.fieldProfiles.append(field_profile) profile.columnCount = table.column_count diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index c6314c212d104d..b485d14dbd946e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -2,7 +2,7 @@ import math from abc import ABC, abstractmethod from time import sleep -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import msal import requests @@ -10,18 +10,27 @@ from requests.adapters import HTTPAdapter from urllib3 import Retry -from datahub.configuration.common import ConfigurationError +from datahub.configuration.common import ConfigurationError, AllowDenyPattern from datahub.ingestion.source.powerbi.config import Constant from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import ( + Column, Dashboard, + Measure, + MeasureProfile, Page, PowerBIDataset, Report, + Table, Tile, User, Workspace, new_powerbi_dataset, ) +from datahub.ingestion.source.powerbi.rest_api_wrapper.profiling_utils import ( + process_column_result, + process_sample_result, +) +from datahub.ingestion.source.powerbi.rest_api_wrapper.query import DaxQuery # Logger instance logger = logging.getLogger(__name__) @@ -105,6 +114,16 @@ def get_tiles_endpoint(self, workspace: Workspace, dashboard_id: str) -> str: def _get_pages_by_report(self, workspace: Workspace, report_id: str) -> List[Page]: pass + @abstractmethod + def profile_dataset( + self, + dataset: PowerBIDataset, + table: Table, + workspace_name: str, + profile_pattern: Optional[AllowDenyPattern], + ) -> None: + pass + @abstractmethod def get_dataset( self, workspace_id: str, dataset_id: str @@ -374,6 +393,7 @@ class RegularAPIResolver(DataResolverBase): Constant.REPORT_GET: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/reports/{REPORT_ID}", Constant.REPORT_LIST: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/reports", Constant.PAGE_BY_REPORT: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/reports/{REPORT_ID}/pages", + Constant.DATASET_EXECUTE_QUERIES: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/datasets/{DATASET_ID}/executeQueries", } def get_dataset( @@ -502,6 +522,133 @@ def _get_pages_by_report(self, workspace: Workspace, report_id: str) -> List[Pag def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User]: return [] # User list is not available in regular access + def _execute_profiling_query(self, dataset: PowerBIDataset, query: str) -> dict: + dataset_query_endpoint: str = self.API_ENDPOINTS[ + Constant.DATASET_EXECUTE_QUERIES + ] + # Replace place holders + dataset_query_endpoint = dataset_query_endpoint.format( + POWERBI_BASE_URL=self.BASE_URL, + WORKSPACE_ID=dataset.workspace_id, + DATASET_ID=dataset.id, + ) + # Hit PowerBi + logger.info(f"Request to query endpoint URL={dataset_query_endpoint}") + payload = { + "queries": [ + { + "query": query, + } + ], + "serializerSettings": { + "includeNulls": True, + }, + } + response = self._request_session.post( + dataset_query_endpoint, + json=payload, + headers=self.get_authorization_header(), + ) + response.raise_for_status() + return response.json() + + def _get_row_count(self, dataset: PowerBIDataset, table: Table) -> int: + query = DaxQuery.row_count_query(table.name) + try: + data = self._execute_profiling_query(dataset, query) + rows = data["results"][0]["tables"][0]["rows"] + count = rows[0]["[count]"] + return count + except requests.exceptions.RequestException as ex: + logger.warning(getattr(ex.response, "text", "")) + logger.warning( + f"Profiling failed for getting row count for dataset {dataset.id}, with status code {getattr(ex.response, 'status_code', None)}", + ) + except (KeyError, IndexError) as ex: + logger.warning( + f"Profiling failed for getting row count for dataset {dataset.id}, with {ex}" + ) + return 0 + + def _get_data_sample(self, dataset: PowerBIDataset, table: Table) -> dict: + try: + query = DaxQuery.data_sample_query(table.name) + data = self._execute_profiling_query(dataset, query) + return process_sample_result(data) + except requests.exceptions.RequestException as ex: + logger.warning(getattr(ex.response, "text", "")) + logger.warning( + f"Getting sample with TopN failed for dataset {dataset.id}, with status code {getattr(ex.response, 'status_code', None)}", + ) + except (KeyError, IndexError) as ex: + logger.warning( + f"Getting sample with TopN failed for dataset {dataset.id}, with {ex}" + ) + return {} + + def _get_column_data( + self, dataset: PowerBIDataset, table: Table, column: Union[Column, Measure] + ) -> dict: + try: + logger.info(f"Column data query for {dataset.name}, {column.name}") + query = DaxQuery.column_data_query(table.name, column.name) + data = self._execute_profiling_query(dataset, query) + return process_column_result(data) + except requests.exceptions.RequestException as ex: + logger.warning(getattr(ex.response, "text", "")) + logger.warning( + f"Getting column statistics failed for dataset {dataset.name}, {column.name}, with status code {getattr(ex.response, 'status_code', None)}", + ) + except (KeyError, IndexError) as ex: + logger.warning( + f"Getting column statistics failed for dataset {dataset.name}, {column.name}, with {ex}" + ) + return {} + + def profile_dataset( + self, + dataset: PowerBIDataset, + table: Table, + workspace_name: str, + profile_pattern: Optional[AllowDenyPattern], + ) -> None: + if not profile_pattern: + logger.info( + f"Profile pattern not configured, not profiling" + ) + return + + if not profile_pattern.allowed(f"{workspace_name}.{dataset.name}.{table.name}"): + logger.info( + f"Table {table.name} in {dataset.name}, not allowed for profiling" + ) + return + + logger.info(f"Profiling table: {table.name}") + row_count = self._get_row_count(dataset, table) + sample = self._get_data_sample(dataset, table) + + table.row_count = row_count + column_count = 0 + + columns: List[Union[Column, Measure]] = [ + *(table.columns or []), + *(table.measures or []), + ] + for column in columns: + if column.isHidden: + continue + + column_sample = sample.get(column.name, None) if sample else None + column_stats = self._get_column_data(dataset, table, column) + + column.measure_profile = MeasureProfile( + sample_values=column_sample, **column_stats + ) + column_count += 1 + + table.column_count = column_count + class AdminAPIResolver(DataResolverBase): # Admin access endpoints @@ -804,3 +951,13 @@ def get_dataset_parameters( ) -> Dict[str, str]: logger.debug("Get dataset parameter is unsupported in Admin API") return {} + + def profile_dataset( + self, + dataset: PowerBIDataset, + table: Table, + workspace_name: str, + profile_pattern: Optional[AllowDenyPattern], + ) -> None: + logger.debug("Profile dataset is unsupported in Admin API") + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index dad3bd76aab79a..4a4cc44ee91245 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -26,9 +26,6 @@ AdminAPIResolver, RegularAPIResolver, ) -from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_profiler import ( - PowerBiDatasetProfilingResolver, -) # Logger instance logger = logging.getLogger(__name__) @@ -50,13 +47,6 @@ def __init__(self, config: PowerBiDashboardSourceConfig) -> None: tenant_id=self.__config.tenant_id, ) - self.__profiling_resolver = PowerBiDatasetProfilingResolver( - client_id=self.__config.client_id, - client_secret=self.__config.client_secret, - tenant_id=self.__config.tenant_id, - config=self.__config, - ) - def log_http_error(self, message: str) -> Any: logger.warning(message) _, e, _ = sys.exc_info() @@ -378,9 +368,13 @@ def _get_workspace_datasets(self, workspace: Workspace) -> dict: row_count=None, column_count=None, ) - self.__profiling_resolver.profile_dataset( - dataset_instance, table, workspace.name - ) + if self.__config.profiling.enabled: + self._get_resolver().profile_dataset( + dataset_instance, + table, + workspace.name, + self.__config.profile_pattern, + ) dataset_instance.tables.append(table) return dataset_map diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py deleted file mode 100644 index 573c39fe5c138e..00000000000000 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_profiler.py +++ /dev/null @@ -1,219 +0,0 @@ -import logging -import re -from typing import Dict, List, Union - -import requests - -from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceConfig -from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import ( - Column, - Measure, - PowerBIDataset, - Table, -) -from datahub.ingestion.source.powerbi.rest_api_wrapper.data_resolver import ( - RegularAPIResolver, -) - - -class ProfilerConstant: - DATASET_EXECUTE_QUERIES_POST = "DATASET_EXECUTE_QUERIES_POST" - - -ENDPOINT = { - ProfilerConstant.DATASET_EXECUTE_QUERIES_POST: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/datasets/{DATASET_ID}/executeQueries", -} - - -logger = logging.getLogger(__name__) - - -def get_column_name(table_and_col: str) -> str: - regex = re.compile(".*\\[(.*)\\]$") - m = regex.match(table_and_col) - if m: - return m.group(1) - return "" - - -def process_sample_result(result_data: dict) -> dict: - sample_data_by_column: Dict[str, List[str]] = {} - rows = result_data["results"][0]["tables"][0]["rows"] - for sample in rows: - for key, value in sample.items(): - if not value: - continue - column_name = get_column_name(key) - if column_name not in sample_data_by_column: - sample_data_by_column[column_name] = [] - sample_data_by_column[column_name].append(str(value)) - return sample_data_by_column - - -def process_column_result(result_data: dict) -> dict: - sample_data_by_column: Dict[str, str] = {} - rows = result_data["results"][0]["tables"][0]["rows"] - for sample in rows: - for key, value in sample.items(): - if not value: - continue - column_name = get_column_name(key) - if column_name != "unique_count": - value = str(value) - sample_data_by_column[column_name] = value - return sample_data_by_column - - -class PowerBiDatasetProfilingResolver(RegularAPIResolver): - config: PowerBiDashboardSourceConfig - overview_stats: dict - - def __init__( - self, - client_id: str, - client_secret: str, - tenant_id: str, - config: PowerBiDashboardSourceConfig, - ): - super(PowerBiDatasetProfilingResolver, self).__init__( - client_id, client_secret, tenant_id - ) - self.config = config - self.overview_stats = {} - - def get_row_count(self, dataset: PowerBIDataset, table: Table) -> int: - query = f""" - EVALUATE ROW("count", COUNTROWS ( '{table.name}' )) - """ - try: - data = self.execute_query(dataset, query) - rows = data["results"][0]["tables"][0]["rows"] - count = rows[0]["[count]"] - return count - except requests.exceptions.RequestException as ex: - logger.warning(getattr(ex.response, "text", "")) - logger.warning( - f"Profiling failed for getting row count for dataset {dataset.id}, with status code {getattr(ex.response, 'status_code', None)}", - ) - except (KeyError, IndexError) as ex: - logger.warning( - f"Profiling failed for getting row count for dataset {dataset.id}, with {ex}" - ) - return 0 - - def get_data_sample(self, dataset: PowerBIDataset, table: Table) -> dict: - try: - query = f"EVALUATE TOPN(3, '{table.name}')" - data = self.execute_query(dataset, query) - return process_sample_result(data) - except requests.exceptions.RequestException as ex: - logger.warning(getattr(ex.response, "text", "")) - logger.warning( - f"Getting sample with TopN failed for dataset {dataset.id}, with status code {getattr(ex.response, 'status_code', None)}", - ) - except (KeyError, IndexError) as ex: - logger.warning( - f"Getting sample with TopN failed for dataset {dataset.id}, with {ex}" - ) - - return {} - - def get_column_data( - self, dataset: PowerBIDataset, table: Table, column: Union[Column, Measure] - ) -> dict: - try: - logger.info(f"Column data query for {dataset.name}, {column.name}") - query = f""" - EVALUATE ROW( - "min", MIN('{table.name}'[{column.name}]), - "max", MAX('{table.name}'[{column.name}]), - "unique_count", COUNTROWS ( DISTINCT ( '{table.name}'[{column.name}] ) ) - )""" - data = self.execute_query(dataset, query) - return process_column_result(data) - except requests.exceptions.RequestException as ex: - logger.warning(getattr(ex.response, "text", "")) - logger.warning( - f"Getting column statistics failed for dataset {dataset.name}, {column.name}, with status code {getattr(ex.response, 'status_code', None)}", - ) - except (KeyError, IndexError) as ex: - logger.warning( - f"Getting column statistics failed for dataset {dataset.name}, {column.name}, with {ex}" - ) - - return {} - - def execute_query(self, dataset: PowerBIDataset, query: str) -> dict: - dataset_query_endpoint: str = ENDPOINT[ - ProfilerConstant.DATASET_EXECUTE_QUERIES_POST - ] - # Replace place holders - dataset_query_endpoint = dataset_query_endpoint.format( - POWERBI_BASE_URL=self.BASE_URL, - WORKSPACE_ID=dataset.workspace_id, - DATASET_ID=dataset.id, - ) - # Hit PowerBi - logger.info(f"Request to query endpoint URL={dataset_query_endpoint}") - payload = { - "queries": [ - { - "query": query, - } - ], - "serializerSettings": { - "includeNulls": True, - }, - } - response = self._request_session.post( - dataset_query_endpoint, - json=payload, - headers=self.get_authorization_header(), - ) - response.raise_for_status() - return response.json() - - def profile_dataset( - self, - dataset: PowerBIDataset, - table: Table, - workspace_name: str, - ) -> None: - if not self.config.profiling.enabled: - # Profiling not enabled - return - - if not self.config.profile_pattern.allowed( - f"{workspace_name}.{dataset.name}.{table.name}" - ): - logger.info( - f"Table {table.name} in {dataset.name}, not allowed for profiling" - ) - return - - logger.info(f"Profiling table: {table.name}") - row_count = self.get_row_count(dataset, table) - sample = self.get_data_sample(dataset, table) - - table.row_count = row_count - column_count = 0 - - columns: List[Union[Column, Measure]] = [ - *(table.columns or []), - *(table.measures or []), - ] - for column in columns: - if column.isHidden: - continue - - if sample and sample.get(column.name, None): - column.sample_values = sample.get(column.name, None) - - column_stats = self.get_column_data(dataset, table, column) - - for key, value in column_stats.items(): - setattr(column, key, value) - - column_count += 1 - - table.column_count = column_count diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py new file mode 100644 index 00000000000000..409be69f1091eb --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py @@ -0,0 +1,38 @@ +import re +from typing import Dict, List + + +def get_column_name(table_and_col: str) -> str: + regex = re.compile(".*\\[(.*)\\]$") + m = regex.match(table_and_col) + if m: + return m.group(1) + return "" + + +def process_sample_result(result_data: dict) -> dict: + sample_data_by_column: Dict[str, List[str]] = {} + rows = result_data["results"][0]["tables"][0]["rows"] + for sample in rows: + for key, value in sample.items(): + if not value: + continue + column_name = get_column_name(key) + if column_name not in sample_data_by_column: + sample_data_by_column[column_name] = [] + sample_data_by_column[column_name].append(str(value)) + return sample_data_by_column + + +def process_column_result(result_data: dict) -> dict: + sample_data_by_column: Dict[str, str] = {} + rows = result_data["results"][0]["tables"][0]["rows"] + for sample in rows: + for key, value in sample.items(): + if not value: + continue + column_name = get_column_name(key) + if column_name != "unique_count": + value = str(value) + sample_data_by_column[column_name] = value + return sample_data_by_column diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/query.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/query.py new file mode 100644 index 00000000000000..cb66210efc8e56 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/query.py @@ -0,0 +1,17 @@ +class DaxQuery: + @staticmethod + def data_sample_query(table_name: str) -> str: + return f"EVALUATE TOPN(3, '{table_name}')" + + @staticmethod + def column_data_query(table_name: str, column_name: str) -> str: + return f""" + EVALUATE ROW( + "min", MIN('{table_name}'[{column_name}]), + "max", MAX('{table_name}'[{column_name}]), + "unique_count", COUNTROWS ( DISTINCT ( '{table_name}'[{column_name}] ) ) + )""" + + @staticmethod + def row_count_query(table_name: str) -> str: + return f"""EVALUATE ROW("count", COUNTROWS ( '{table_name}' ))""" diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json b/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json index 9a57a01b914552..580a8d1a1db119 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_profiling.json @@ -1,7 +1,7 @@ [ { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD)", "changeType": "UPSERT", "aspectName": "viewProperties", "aspect": { @@ -13,12 +13,13 @@ }, "systemMetadata": { "lastObserved": 1645599600000, - "runId": "powerbi-test" + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" } }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD)", "changeType": "UPSERT", "aspectName": "datasetProperties", "aspect": { @@ -28,75 +29,19 @@ }, "externalUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445/details", "name": "articles", - "description": "", + "description": "Library Dataset", "tags": [] } }, "systemMetadata": { "lastObserved": 1645599600000, - "runId": "powerbi-test" + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" } }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1645599600000, - "runId": "powerbi-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", - "changeType": "UPSERT", - "aspectName": "upstreamLineage", - "aspect": { - "json": { - "upstreams": [ - { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)", - "type": "TRANSFORMED" - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1645599600000, - "runId": "powerbi-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "PowerBI Dataset Table", - "View" - ] - } - }, - "systemMetadata": { - "lastObserved": 1645599600000, - "runId": "powerbi-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,DEV)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD)", "changeType": "UPSERT", "aspectName": "datasetProfile", "aspect": { @@ -111,6 +56,9 @@ "fieldProfiles": [ { "fieldPath": "link", + "uniqueCount": 15, + "min": "3", + "max": "34333", "sampleValues": [ "http://example.org", "http://example.org/111/22/foo", @@ -119,6 +67,9 @@ }, { "fieldPath": "description", + "uniqueCount": 15, + "min": "3", + "max": "34333", "sampleValues": [ "this is a sample", "this describes content", @@ -127,6 +78,9 @@ }, { "fieldPath": "topic", + "uniqueCount": 15, + "min": "3", + "max": "34333", "sampleValues": [ "urgent matters", "urgent matters", @@ -135,6 +89,9 @@ }, { "fieldPath": "view_count", + "uniqueCount": 15, + "min": "3", + "max": "34333", "sampleValues": [ "123455", "123455", @@ -146,7 +103,68 @@ }, "systemMetadata": { "lastObserved": 1645599600000, - "runId": "powerbi-test" + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "PowerBI Dataset Table", + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.articles,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1645599600000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/powerbi/test_profiling.py b/metadata-ingestion/tests/integration/powerbi/test_profiling.py index 6bc18e8551bcf9..7955386de8940b 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_profiling.py +++ b/metadata-ingestion/tests/integration/powerbi/test_profiling.py @@ -38,7 +38,7 @@ def admin_datasets_response(request, context): def execute_queries_response(request, context): query = request.json()["queries"][0]["query"] - if "COLUMNSTATISTICS" in query: + if "unique_count" in query: return { "results": [ { @@ -46,44 +46,9 @@ def execute_queries_response(request, context): { "rows": [ { - "[Table Name]": "articles", - "[Column Name]": "link", - "[Min]": 0, - "[Max]": 1, - "[Cardinality]": 2, - "[Max Length]": None, - }, - { - "[Table Name]": "articles", - "[Column Name]": "description", - "[Min]": "0", - "[Max]": "1", - "[Cardinality]": 2, - "[Max Length]": 1, - }, - { - "[Table Name]": "articles", - "[Column Name]": "RowNumber-aabb11", - "[Min]": "0", - "[Max]": "1", - "[Cardinality]": 2, - "[Max Length]": 1, - }, - { - "[Table Name]": "articles", - "[Column Name]": "topic", - "[Min]": 0, - "[Max]": 1, - "[Cardinality]": 2, - "[Max Length]": None, - }, - { - "[Table Name]": "articles", - "[Column Name]": "view_count", - "[Min]": 0, - "[Max]": 9993334, - "[Cardinality]": 23444, - "[Max Length]": None, + "[min]": 3, + "[max]": 34333, + "[unique_count]": 15, }, ] } @@ -142,12 +107,12 @@ def execute_queries_response(request, context): def register_mock_admin_api(request_mock: Any, override_data: dict = {}) -> None: api_vs_response = { - "https://api.powerbi.com/v1.0/myorg/admin/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets": { + "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets": { "method": "GET", "status_code": 200, "json": admin_datasets_response, }, - "https://api.powerbi.com/v1.0/myorg/admin/groups": { + "https://api.powerbi.com/v1.0/myorg/groups?%24top=1000&%24skip=0&%24filter=type+eq+%27Workspace%27": { "method": "GET", "status_code": 200, "json": { @@ -162,7 +127,7 @@ def register_mock_admin_api(request_mock: Any, override_data: dict = {}) -> None ], }, }, - "https://api.powerbi.com/v1.0/myorg/admin/groups/64ED5CAD-7C10-4684-8180-826122881108/dashboards": { + "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/dashboards": { "method": "GET", "status_code": 200, "json": {"value": []}, @@ -275,6 +240,16 @@ def register_mock_admin_api(request_mock: Any, override_data: dict = {}) -> None "status_code": 200, "json": scan_init_response, }, + "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445": { + "method": "GET", + "status_code": 200, + "json": { + "id": "05169CD2-E713-41E6-9600-1D8066D95445", + "name": "library-dataset", + "description": "Library Dataset", + "webUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445", + }, + }, } api_vs_response.update(override_data) @@ -312,12 +287,11 @@ def default_source_config(): "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", "extract_lineage": True, "extract_reports": False, - "admin_apis_only": True, + "admin_apis_only": False, "extract_ownership": True, "convert_lineage_urns_to_lowercase": False, "extract_independent_datasets": True, "workspace_id_pattern": {"allow": ["64ED5CAD-7C10-4684-8180-826122881108"]}, - "env": "DEV", "extract_workspaces_to_containers": False, "profiling": { "enabled": True, From 3edbf3b18459dc8064350afb2157f879c1ee568a Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Fri, 12 Jan 2024 17:53:46 +0200 Subject: [PATCH 05/15] fix: add profiling capability decorator --- .../src/datahub/ingestion/source/powerbi/powerbi.py | 4 ++++ .../source/powerbi/rest_api_wrapper/data_resolver.py | 4 +--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index c68218b93da1b6..e74a310358e67c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -1200,6 +1200,10 @@ def report_to_datahub_work_units( SourceCapability.LINEAGE_FINE, "Disabled by default, configured using `extract_column_level_lineage`. ", ) +@capability( + SourceCapability.DATA_PROFILING, + "Optionally enabled via configuration profiling.enabled", +) class PowerBiDashboardSource(StatefulIngestionSourceBase): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index b485d14dbd946e..09cb0b06f6b4de 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -613,9 +613,7 @@ def profile_dataset( profile_pattern: Optional[AllowDenyPattern], ) -> None: if not profile_pattern: - logger.info( - f"Profile pattern not configured, not profiling" - ) + logger.info(f"Profile pattern not configured, not profiling") return if not profile_pattern.allowed(f"{workspace_name}.{dataset.name}.{table.name}"): From 71ae650b339045e87ad007788bb86a91b8cd685f Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Thu, 18 Jan 2024 10:51:42 +0200 Subject: [PATCH 06/15] docs: update docs about required permissions for profiling --- docs/quick-ingestion-guides/powerbi/setup.md | 2 +- metadata-ingestion/docs/sources/powerbi/powerbi_pre.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/quick-ingestion-guides/powerbi/setup.md b/docs/quick-ingestion-guides/powerbi/setup.md index 2eba1903ebf4c8..b8c6d66c97e2bb 100644 --- a/docs/quick-ingestion-guides/powerbi/setup.md +++ b/docs/quick-ingestion-guides/powerbi/setup.md @@ -70,7 +70,7 @@ In order to configure ingestion from PowerBI, you'll first have to ensure you ha - `Enhance admin APIs responses with detailed metadata` - `Enhance admin APIs responses with DAX and mashup expressions` - f. **Add Security Group to Workspace:** Navigate to `Workspaces` window and open workspace which you want to ingest as shown in below screenshot and click on `Access` and add `powerbi-connector-app-security-group` as member + f. **Add Security Group to Workspace:** Navigate to `Workspaces` window and open workspace which you want to ingest as shown in below screenshot and click on `Access` and add `powerbi-connector-app-security-group` as member. For most cases `Viewer` role is enough, but for profiling the `Contributor` role is required.

workspace-window-underlined diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md index e074401828cea4..43ffc44413b648 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md @@ -109,7 +109,7 @@ Please note that the default implementation overwrites tags for the ingested ent ## Profiling -The profiling implementation is done through querying [DAX query endpoint](https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/execute-queries). Therefore the principal needs to have permission to query the datasets to be profiled. Profiling is done with column based queries to be able to handle wide datasets without timeouts. +The profiling implementation is done through querying [DAX query endpoint](https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/execute-queries). Therefore the principal needs to have permission to query the datasets to be profiled. Usually this means that the service principal should have `Contributor` role for the workspace to be ingested. Profiling is done with column based queries to be able to handle wide datasets without timeouts. ## Admin Ingestion vs. Basic Ingestion PowerBI provides two sets of API i.e. [Basic API and Admin API](https://learn.microsoft.com/en-us/rest/api/power-bi/). From d4bb88f88a7857664239c0a80a9b51a10e86d502 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Thu, 18 Jan 2024 12:27:07 +0200 Subject: [PATCH 07/15] fix: import order --- .../ingestion/source/powerbi/rest_api_wrapper/data_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index 94084d1c28f6b4..91255784974a35 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -11,7 +11,7 @@ from requests.adapters import HTTPAdapter from urllib3 import Retry -from datahub.configuration.common import ConfigurationError, AllowDenyPattern +from datahub.configuration.common import AllowDenyPattern, ConfigurationError from datahub.ingestion.source.powerbi.config import Constant from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import ( Column, From 5aefa51738be4498b0300949b350909c7ace9276 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Thu, 18 Jan 2024 15:48:11 +0200 Subject: [PATCH 08/15] fix: flake8 error --- .../ingestion/source/powerbi/rest_api_wrapper/data_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index 91255784974a35..4b84acfe82d295 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -626,7 +626,7 @@ def profile_dataset( profile_pattern: Optional[AllowDenyPattern], ) -> None: if not profile_pattern: - logger.info(f"Profile pattern not configured, not profiling") + logger.info("Profile pattern not configured, not profiling") return if not profile_pattern.allowed(f"{workspace_name}.{dataset.name}.{table.name}"): From 9f6d82dfc679534dc62bf1911f9694c28c0d9962 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Sat, 2 Mar 2024 13:24:30 +0200 Subject: [PATCH 09/15] style: code style improvements --- .../src/datahub/ingestion/source/powerbi/powerbi.py | 2 +- .../source/powerbi/rest_api_wrapper/profiling_utils.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 956507037bc8eb..34544c1d7782a5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -504,7 +504,7 @@ def extract_profile( f"Table {table.name} in {dataset.name}, not allowed for profiling" ) return - logger.info(f"Profiling table: {table.name}") + logger.debug(f"Profiling table: {table.name}") profile = DatasetProfileClass(timestampMillis=builder.get_sys_time()) profile.rowCount = table.row_count diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py index 409be69f1091eb..ea5da7c990d058 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py @@ -1,13 +1,13 @@ import re -from typing import Dict, List +from typing import Dict, List, Optional -def get_column_name(table_and_col: str) -> str: +def get_column_name(table_and_col: str) -> Optional[str]: regex = re.compile(".*\\[(.*)\\]$") m = regex.match(table_and_col) if m: return m.group(1) - return "" + return None def process_sample_result(result_data: dict) -> dict: From bac8c557de5850278b63c7c3fe01e250abc5127d Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Sat, 2 Mar 2024 13:53:11 +0200 Subject: [PATCH 10/15] docs: explain reasoning behind includeNulls --- .../source/powerbi/rest_api_wrapper/data_resolver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index 4b84acfe82d295..ff26262f57ed31 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -547,6 +547,9 @@ def _execute_profiling_query(self, dataset: PowerBIDataset, query: str) -> dict: ) # Hit PowerBi logger.info(f"Request to query endpoint URL={dataset_query_endpoint}") + + # Serializer is configured to include nulls so that the queried fields + # exist in the returned payloads. Only failed queries will result in KeyError payload = { "queries": [ { @@ -603,7 +606,7 @@ def _get_column_data( self, dataset: PowerBIDataset, table: Table, column: Union[Column, Measure] ) -> dict: try: - logger.info(f"Column data query for {dataset.name}, {column.name}") + logger.debug(f"Column data query for {dataset.name}, {column.name}") query = DaxQuery.column_data_query(table.name, column.name) data = self._execute_profiling_query(dataset, query) return process_column_result(data) From 8c0b7f1a22477b70e0513ca8e4b3198288c77ab4 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Sat, 2 Mar 2024 13:54:45 +0200 Subject: [PATCH 11/15] fix: mypy errors --- .../source/powerbi/rest_api_wrapper/profiling_utils.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py index ea5da7c990d058..e929cd54f59fd8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py @@ -18,7 +18,11 @@ def process_sample_result(result_data: dict) -> dict: if not value: continue column_name = get_column_name(key) - if column_name not in sample_data_by_column: + + if not column_name: + continue + + if column_name not in sample_data_by_column: sample_data_by_column[column_name] = [] sample_data_by_column[column_name].append(str(value)) return sample_data_by_column @@ -32,6 +36,10 @@ def process_column_result(result_data: dict) -> dict: if not value: continue column_name = get_column_name(key) + + if not column_name: + continue + if column_name != "unique_count": value = str(value) sample_data_by_column[column_name] = value From fddb3a43d7d3246fb1360d4a66b6469ee0968f41 Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Sun, 3 Mar 2024 15:10:20 +0200 Subject: [PATCH 12/15] docs: update docs and parameter definitions --- metadata-ingestion/docs/sources/powerbi/powerbi_pre.md | 4 ++++ .../docs/sources/powerbi/powerbi_recipe.yml | 10 ++++++++++ .../src/datahub/ingestion/source/powerbi/config.py | 2 +- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md index 43ffc44413b648..360dabf0843767 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md @@ -111,6 +111,10 @@ Please note that the default implementation overwrites tags for the ingested ent The profiling implementation is done through querying [DAX query endpoint](https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/execute-queries). Therefore the principal needs to have permission to query the datasets to be profiled. Usually this means that the service principal should have `Contributor` role for the workspace to be ingested. Profiling is done with column based queries to be able to handle wide datasets without timeouts. +Take into account that the profiling implementation exeutes fairly big amount of DAX queries and for big datasets this is substantial load to the PowerBI system. + +The `profiling_pattern` setting may be used to limit profiling actions to only a certain set of resources in PowerBI. Both allow and deny rules are matched against following pattern for every table in a PowerBI Dataset: `workspace_name.dataset_name.table_name`. User may limit profiling with these settings at table level, dataset level or workspace level. + ## Admin Ingestion vs. Basic Ingestion PowerBI provides two sets of API i.e. [Basic API and Admin API](https://learn.microsoft.com/en-us/rest/api/power-bi/). diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml b/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml index 31eed0bddaa6a8..ebd3dd50cfebc7 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml @@ -64,6 +64,16 @@ source: # extract powerbi dataset table schema extract_dataset_schema: true + # Enable PowerBI dataset profiling + profiling: + enabled: false + # Pattern to limit which resources to profile + # Matched resource format is following: + # workspace_name.dataset_name.table_name + profile_pattern: + deny: + - .* + sink: # sink configs diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 3dd418f1eaaed2..f03ecb71577957 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -422,7 +422,7 @@ class PowerBiDashboardSourceConfig( profile_pattern: AllowDenyPattern = pydantic.Field( default=AllowDenyPattern.allow_all(), description="Regex patterns to filter tables for profiling during ingestion. Note that only tables " - "allowed by the `table_pattern` will be considered. Matched format is 'datasetname.tablename'", + "allowed by the `table_pattern` will be considered. Matched format is 'workspacename.datasetname.tablename'", ) profiling: PowerBiProfilingConfig = PowerBiProfilingConfig() From db0742070c052212ea4c63bbbc9f887de23ad0de Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Sun, 3 Mar 2024 15:11:32 +0200 Subject: [PATCH 13/15] style: fix formatting --- .../source/powerbi/rest_api_wrapper/profiling_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py index e929cd54f59fd8..35e4cea41264ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/profiling_utils.py @@ -22,7 +22,7 @@ def process_sample_result(result_data: dict) -> dict: if not column_name: continue - if column_name not in sample_data_by_column: + if column_name not in sample_data_by_column: sample_data_by_column[column_name] = [] sample_data_by_column[column_name].append(str(value)) return sample_data_by_column From bfdb963997d86a411223286ff3f124026216bc6e Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Fri, 10 May 2024 10:56:36 +0300 Subject: [PATCH 14/15] docs: dataset profiling is unavailable through admin api --- metadata-ingestion/docs/sources/powerbi/powerbi_pre.md | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md index 360dabf0843767..c969d2f79f4bfa 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md @@ -147,6 +147,7 @@ If you don't want to add a service principal as a member in your workspace, then Caveats of setting `admin_apis_only` to `true`: - Report's pages would not get ingested as page API is not available in PowerBI Admin API - [PowerBI Parameters](https://learn.microsoft.com/en-us/power-query/power-query-query-parameters) would not get resolved to actual values while processing M-Query for table lineage + - Dataset profiling is unavailable, as it requires access to the workspace API ### Basic Ingestion: Service Principal As Member In Workspace From 81c7a31dd9dbc7fb0f55e9540ac3f6ef3a68f60f Mon Sep 17 00:00:00 2001 From: Teppo Naakka Date: Fri, 10 May 2024 11:26:29 +0300 Subject: [PATCH 15/15] lint: fix linting --- .../src/datahub/ingestion/source/powerbi/powerbi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 4217d6ac67428a..e3ca9783aced82 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -53,7 +53,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) -from datahub.metadata._schema_classes import DatasetProfileClass from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( FineGrainedLineage, @@ -70,6 +69,7 @@ DashboardKeyClass, DatasetFieldProfileClass, DatasetLineageTypeClass, + DatasetProfileClass, DatasetPropertiesClass, GlobalTagsClass, OtherSchemaClass,