Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/powerbi): handle special character #(tab) in native query parsing #10520

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,18 +391,20 @@ class PowerBiDashboardSourceConfig(

# Enable advance sql construct
enable_advance_lineage_sql_construct: bool = pydantic.Field(
default=False,
default=True,
description="Whether to enable advance native sql construct for parsing like join, sub-queries. "
"along this flag , the native_query_parsing should be enabled. "
"By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous ingestion execution then it may break lineage "
"By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous "
"ingestion execution then it may break lineage"
"as this option generates the upstream datasets URN in lowercase.",
)

# Enable CLL extraction
extract_column_level_lineage: bool = pydantic.Field(
default=False,
description="Whether to extract column level lineage. "
"Works only if configs `native_query_parsing`, `enable_advance_lineage_sql_construct` & `extract_lineage` are enabled. "
"Works only if configs `native_query_parsing`, `enable_advance_lineage_sql_construct` & `extract_lineage` are "
"enabled."
"Works for M-Query where native SQL is used for transformation.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
create_lineage_sql_parsed_result,
)

SPECIAL_CHARACTERS = ["#(lf)", "(lf)"]
SPECIAL_CHARACTERS = ["#(lf)", "(lf)", "#(tab)"]

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ def create_lineage(
self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
data_access_tokens[0]
]

# First argument is the query
sql_query: str = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def default_source_config():
},
"env": "DEV",
"extract_workspaces_to_containers": False,
"enable_advance_lineage_sql_construct": False,
}


Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/tests/integration/powerbi/test_m_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source',
'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"',
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source",
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="ORDERING"]}[Data], "SELECT#(lf) DISTINCT#(lf) T5.PRESENTMENT_START_DATE#(lf),T5.PRESENTMENT_END_DATE#(lf),T5.DISPLAY_NAME#(lf),T5.NAME#(tab)#(lf),T5.PROMO_DISPLAY_NAME#(lf),T5.REGION#(lf),T5.ID#(lf),T5.WALKOUT#(lf),T6.DEAL_ID#(lf),T6.TYPE#(lf),T5.FREE_PERIOD#(lf),T6.PRICE_MODIFICATION#(lf)#(lf)FROM#(lf)#(lf)(#(lf) SELECT #(lf) T1.NAME#(lf),DATE(T1.CREATED_AT) as CREATED_AT#(lf),T1.PROMO_CODE#(lf),T1.STATUS#(lf),DATE(T1.UPDATED_AT) as UPDATED_AT#(lf),T1.ID#(lf),T1.DISPLAY_NAME as PROMO_DISPLAY_NAME#(lf),T4.*#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) NAME#(lf),CREATED_AT#(lf),PROMO_CODE#(lf),STATUS#(lf),UPDATED_AT#(lf),ID#(lf),DISPLAY_NAME#(lf) FROM RAW.PROMOTIONS#(lf)#(lf)) T1#(lf)INNER JOIN#(lf)#(lf) (#(lf) SELECT #(lf) T3.PRODUCT_STATUS#(lf),T3.CODE#(lf),T3.REGION#(lf),T3.DISPLAY_ORDER_SEQUENCE#(lf),T3.PRODUCT_LINE_ID#(lf),T3.DISPLAY_NAME#(lf),T3.PRODUCT_TYPE#(lf),T3.ID as PROD_TBL_ID#(lf),T3.NAME as PROD_TBL_NAME#(lf),DATE(T2.PRESENTMENT_END_DATE) as PRESENTMENT_END_DATE#(lf),T2.PRICE_COMMITMENT_PERIOD#(lf),T2.NAME as SEAL_TBL_NAME#(lf),DATE(T2.CREATED_AT) as SEAL_TBL_CREATED_AT#(lf),T2.DESCRIPTION#(lf),T2.FREE_PERIOD#(lf),T2.WALKOUT#(lf),T2.PRODUCT_CAT_ID#(lf),T2.PROMOTION_ID#(lf),DATE(T2.PRESENTMENT_START_DATE) as PRESENTMENT_START_DATE#(lf),YEAR(T2.PRESENTMENT_START_DATE) as DEAL_YEAR_START#(lf),MONTH(T2.PRESENTMENT_START_DATE) as DEAL_MONTH_START#(lf),T2.DEAL_TYPE#(lf),DATE(T2.UPDATED_AT) as SEAL_TBL_UPDATED_AT#(lf),T2.ID as SEAL_TBL_ID#(lf),T2.STATUS as SEAL_TBL_STATUS#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) PRODUCT_STATUS#(lf),CODE#(lf),REGION#(lf),DISPLAY_ORDER_SEQUENCE#(lf),PRODUCT_LINE_ID#(lf),DISPLAY_NAME#(lf),PRODUCT_TYPE#(lf),ID #(lf),NAME #(lf) FROM#(lf) RAW.PRODUCTS#(lf)#(lf)) T3#(lf)INNER JOIN#(lf)(#(lf) SELECT#(lf) DISTINCT#(lf) PRESENTMENT_END_DATE#(lf),PRICE_COMMITMENT_PERIOD#(lf),NAME#(lf),CREATED_AT#(lf),DESCRIPTION#(lf),FREE_PERIOD#(lf),WALKOUT#(lf),PRODUCT_CAT_ID#(lf),PROMOTION_ID#(lf),PRESENTMENT_START_DATE#(lf),DEAL_TYPE#(lf),UPDATED_AT#(lf),ID#(lf),STATUS#(lf) FROM#(lf) RAW.DEALS#(lf)#(lf)) T2#(lf)ON#(lf)T3.ID = T2.PRODUCT_CAT_ID #(lf)WHERE#(lf)T2.PRESENTMENT_START_DATE >= \'2015-01-01\'#(lf)AND#(lf)T2.STATUS = \'active\'#(lf)#(lf))T4#(lf)ON#(lf)T1.ID = T4.PROMOTION_ID#(lf))T5#(lf)INNER JOIN#(lf)RAW.PRICE_MODIFICATIONS T6#(lf)ON#(lf)T5.SEAL_TBL_ID = T6.DEAL_ID", null, [EnableFolding=true]) \n in \n Source',
]


Expand All @@ -59,6 +60,7 @@ def get_default_instances(
"tenant_id": "fake",
"client_id": "foo",
"client_secret": "bar",
"enable_advance_lineage_sql_construct": False,
**override_config,
}
)
Expand Down Expand Up @@ -763,3 +765,43 @@ def test_sqlglot_parser():
assert lineage[0].column_lineage[i].downstream.table is None
assert lineage[0].column_lineage[i].downstream.column == column
assert lineage[0].column_lineage[i].upstreams == []


def test_sqlglot_parser_2():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[25],
name="SALES_TARGET",
full_name="dev.public.sales",
)
reporter = PowerBiDashboardSourceReport()

ctx, config, platform_instance_resolver = get_default_instances(
override_config={
"server_to_platform_instance": {
"0DD93C6BD5A6.snowflakecomputing.com": {
"platform_instance": "sales_deployment",
"env": "PROD",
}
},
"native_query_parsing": True,
"enable_advance_lineage_sql_construct": True,
}
)

lineage: List[resolver.Lineage] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)

data_platform_tables: List[DataPlatformTable] = lineage[0].upstreams

assert len(data_platform_tables) == 4
assert [dpt.urn for dpt in data_platform_tables] == [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.raw.deals,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.raw.price_modifications,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.raw.products,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.raw.promotions,PROD)",
]
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ def default_source_config():
},
"env": "DEV",
"extract_workspaces_to_containers": False,
"enable_advance_lineage_sql_construct": False,
}


Expand Down
Loading