-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/powerbi): powerbi dataset profiling #9355
Changes from 2 commits
1914028
6c82b50
7f6d96f
4fadf1f
3edbf3b
d7190fb
71ae650
d4bb88f
5aefa51
9f6d82d
4f39a14
bac8c55
8c0b7f1
fddb3a4
db07420
74062c5
bfdb963
81c7a31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,6 +107,10 @@ By default, extracting endorsement information to tags is disabled. The feature | |
|
||
Please note that the default implementation overwrites tags for the ingested entities, if you need to preserve existing tags, consider using a [transformer](../../../../metadata-ingestion/docs/transformer/dataset_transformer.md#simple-add-dataset-globaltags) with `semantics: PATCH` tags instead of `OVERWRITE`. | ||
|
||
## Profiling | ||
|
||
The profiling implementation is done through querying [DAX query endpoint](https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/execute-queries). Therefore the principal needs to have permission to query the datasets to be profiled. Profiling is done with column based queries to be able to handle wide datasets without timeouts. | ||
|
||
## Admin Ingestion vs. Basic Ingestion | ||
PowerBI provides two sets of API i.e. [Basic API and Admin API](https://learn.microsoft.com/en-us/rest/api/power-bi/). | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the delay, I added mention that the dataset profiling is not available through the Admin API |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ | |
from datahub.ingestion.source.state.stateful_ingestion_base import ( | ||
StatefulIngestionSourceBase, | ||
) | ||
from datahub.metadata._schema_classes import DatasetProfileClass | ||
from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps | ||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( | ||
FineGrainedLineage, | ||
|
@@ -58,6 +59,7 @@ | |
CorpUserKeyClass, | ||
DashboardInfoClass, | ||
DashboardKeyClass, | ||
DatasetFieldProfileClass, | ||
DatasetLineageTypeClass, | ||
DatasetPropertiesClass, | ||
GlobalTagsClass, | ||
|
@@ -473,9 +475,62 @@ def to_datahub_dataset( | |
Constant.DATASET, | ||
dataset.tags, | ||
) | ||
self.extract_profile(dataset_mcps, workspace, dataset, table, ds_urn) | ||
|
||
return dataset_mcps | ||
|
||
def extract_profile( | ||
self, | ||
dataset_mcps: List[MetadataChangeProposalWrapper], | ||
workspace: powerbi_data_classes.Workspace, | ||
dataset: powerbi_data_classes.PowerBIDataset, | ||
table: powerbi_data_classes.Table, | ||
ds_urn: str, | ||
) -> None: | ||
if not self.__config.profiling.enabled: | ||
# Profiling not enabled | ||
return | ||
|
||
if not self.__config.profile_pattern.allowed( | ||
f"{workspace.name}.{dataset.name}.{table.name}" | ||
): | ||
logger.info( | ||
f"Table {table.name} in {dataset.name}, not allowed for profiling" | ||
) | ||
return | ||
logger.info(f"Profiling table: {table.name}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets make it debug |
||
|
||
profile = DatasetProfileClass(timestampMillis=builder.get_sys_time()) | ||
profile.rowCount = table.row_count | ||
profile.fieldProfiles = [] | ||
|
||
columns: List[ | ||
Union[powerbi_data_classes.Column, powerbi_data_classes.Measure] | ||
] = [*(table.columns or []), *(table.measures or [])] | ||
for column in columns: | ||
allowed_column = self.__config.profile_pattern.allowed( | ||
f"{workspace.name}.{dataset.name}.{table.name}.{column.name}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please give example value of |
||
) | ||
if column.isHidden or not allowed_column: | ||
logger.info(f"Column {column.name} not allowed for profiling") | ||
continue | ||
field_profile = DatasetFieldProfileClass(column.name or "") | ||
field_profile.sampleValues = column.sample_values | ||
field_profile.min = column.min | ||
field_profile.max = column.max | ||
field_profile.uniqueCount = column.unique_count | ||
profile.fieldProfiles.append(field_profile) | ||
|
||
profile.columnCount = table.column_count | ||
|
||
mcp = MetadataChangeProposalWrapper( | ||
entityType="dataset", | ||
entityUrn=ds_urn, | ||
aspectName="datasetProfile", | ||
aspect=profile, | ||
) | ||
dataset_mcps.append(mcp) | ||
|
||
@staticmethod | ||
def transform_tags(tags: List[str]) -> GlobalTagsClass: | ||
return GlobalTagsClass( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,10 @@ class Column: | |
columnType: Optional[str] = None | ||
expression: Optional[str] = None | ||
description: Optional[str] = None | ||
min: Optional[str] = None | ||
max: Optional[str] = None | ||
unique_count: Optional[int] = None | ||
sample_values: Optional[List[str]] = None | ||
|
||
|
||
@dataclass | ||
|
@@ -108,6 +112,10 @@ class Measure: | |
BooleanTypeClass, DateTypeClass, NullTypeClass, NumberTypeClass, StringTypeClass | ||
] = dataclasses.field(default_factory=NullTypeClass) | ||
description: Optional[str] = None | ||
min: Optional[str] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to have MeasureProfiling dataclass and add the reference here |
||
max: Optional[str] = None | ||
unique_count: Optional[int] = None | ||
sample_values: Optional[List[str]] = None | ||
|
||
|
||
@dataclass | ||
|
@@ -117,6 +125,8 @@ class Table: | |
expression: Optional[str] = None | ||
columns: Optional[List[Column]] = None | ||
measures: Optional[List[Measure]] = None | ||
row_count: Optional[int] = None | ||
column_count: Optional[int] = None | ||
|
||
# Pointer to the parent dataset. | ||
dataset: Optional["PowerBIDataset"] = None | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,9 @@ | |
AdminAPIResolver, | ||
RegularAPIResolver, | ||
) | ||
from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_profiler import ( | ||
PowerBiDatasetProfilingResolver, | ||
) | ||
|
||
# Logger instance | ||
logger = logging.getLogger(__name__) | ||
|
@@ -47,6 +50,13 @@ def __init__(self, config: PowerBiDashboardSourceConfig) -> None: | |
tenant_id=self.__config.tenant_id, | ||
) | ||
|
||
self.__profiling_resolver = PowerBiDatasetProfilingResolver( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are only two way to get the metadata from PowerBI either using regular API or Admin API. You can clearly see this segregation in PowerBI Rest API documentation: https://learn.microsoft.com/en-us/rest/api/power-bi/. As per doc it looks like it comes under regular-api, so please add profile_dataset in base resolver and provide implementation in both regular and admin resolver (here it is just pass). Invoke the powerbi_profiler.py from regular api's profile_dataset method |
||
client_id=self.__config.client_id, | ||
client_secret=self.__config.client_secret, | ||
tenant_id=self.__config.tenant_id, | ||
config=self.__config, | ||
) | ||
|
||
def log_http_error(self, message: str) -> Any: | ||
logger.warning(message) | ||
_, e, _ = sys.exc_info() | ||
|
@@ -286,11 +296,12 @@ def _parse_endorsement(endorsements: Optional[dict]) -> List[str]: | |
|
||
return [endorsement] | ||
|
||
def _get_workspace_datasets(self, scan_result: Optional[dict]) -> dict: | ||
def _get_workspace_datasets(self, workspace: Workspace) -> dict: | ||
""" | ||
Filter out "dataset" from scan_result and return Dataset instance set | ||
""" | ||
dataset_map: dict = {} | ||
scan_result = workspace.scan_result | ||
|
||
if scan_result is None: | ||
return dataset_map | ||
|
@@ -344,30 +355,33 @@ def _get_workspace_datasets(self, scan_result: Optional[dict]) -> dict: | |
and len(table[Constant.SOURCE]) > 0 | ||
else None | ||
) | ||
dataset_instance.tables.append( | ||
Table( | ||
name=table[Constant.NAME], | ||
full_name="{}.{}".format( | ||
dataset_name.replace(" ", "_"), | ||
table[Constant.NAME].replace(" ", "_"), | ||
), | ||
expression=expression, | ||
columns=[ | ||
Column( | ||
**column, | ||
datahubDataType=FIELD_TYPE_MAPPING.get( | ||
column["dataType"], FIELD_TYPE_MAPPING["Null"] | ||
), | ||
) | ||
for column in table.get("columns", []) | ||
], | ||
measures=[ | ||
Measure(**measure) for measure in table.get("measures", []) | ||
], | ||
dataset=dataset_instance, | ||
) | ||
table = Table( | ||
name=table[Constant.NAME], | ||
full_name="{}.{}".format( | ||
dataset_name.replace(" ", "_"), | ||
table[Constant.NAME].replace(" ", "_"), | ||
), | ||
expression=expression, | ||
columns=[ | ||
Column( | ||
**column, | ||
datahubDataType=FIELD_TYPE_MAPPING.get( | ||
column["dataType"], FIELD_TYPE_MAPPING["Null"] | ||
), | ||
) | ||
for column in table.get("columns", []) | ||
], | ||
measures=[ | ||
Measure(**measure) for measure in table.get("measures", []) | ||
], | ||
dataset=dataset_instance, | ||
row_count=None, | ||
column_count=None, | ||
) | ||
|
||
self.__profiling_resolver.profile_dataset( | ||
dataset_instance, table, workspace.name | ||
) | ||
dataset_instance.tables.append(table) | ||
return dataset_map | ||
|
||
def _fill_metadata_from_scan_result( | ||
|
@@ -392,9 +406,7 @@ def _fill_metadata_from_scan_result( | |
independent_datasets=[], | ||
) | ||
cur_workspace.scan_result = workspace_metadata | ||
cur_workspace.datasets = self._get_workspace_datasets( | ||
cur_workspace.scan_result | ||
) | ||
cur_workspace.datasets = self._get_workspace_datasets(cur_workspace) | ||
|
||
# Fetch endorsements tag if it is enabled from configuration | ||
if self.__config.extract_endorsements_to_tags: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional documentation need to be added:
docs/quick-ingestion-guides/powerbi/setup.md
@capability( SourceCapability.DATA_PROFILING, "Optionally enabled via configuration
profiling.enabled", )