Skip to content

Commit

Permalink
fix(ingest/redshift): tweak lineage v2 queries (#10045)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 15, 2024
1 parent 3f85896 commit d4d175d
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 6 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def extract_sql_agg_log(query_log_file: str, output: Optional[str]) -> None:

if output:
with open(output, "w") as f:
json.dump(queries, f, indent=2)
json.dump(queries, f, indent=2, default=str)
logger.info(f"Extracted {len(queries)} queries to {output}")
else:
click.echo(json.dumps(queries, indent=2))
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') as create_command,
query_text,
row_number() over (
partition by TRIM(query_text)
partition by session_id, TRIM(query_text)
order by start_time desc
) rn
from
Expand Down Expand Up @@ -957,6 +957,8 @@ def list_copy_commands_sql(
# also similar happens if for example table name contains special characters quoted with " i.e. "test-table1"
# it is also worth noting that "query_type" field from SYS_QUERY_HISTORY could be probably used to improve many
# of complicated queries in this file
# However, note that we can't really use this query fully everywhere, despite it being simpler, because
# the SYS_QUERY_TEXT.text field is truncated to 4000 characters and strips out linebreaks.
@staticmethod
def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
start_time_str: str = start_time.strftime(redshift_datetime_format)
Expand All @@ -976,7 +978,7 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
query_text,
REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') AS create_command,
ROW_NUMBER() OVER (
PARTITION BY query_text
PARTITION BY session_id, query_text
ORDER BY start_time DESC
) rn
FROM
Expand Down Expand Up @@ -1011,6 +1013,7 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
)
WHERE
rn = 1
ORDER BY start_time ASC
;
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_sql_alchemy_url(self):

@platform_name("SQLAlchemy", id="sqlalchemy")
@config_class(SQLAlchemyGenericConfig)
@support_status(SupportStatus.CERTIFIED)
@support_status(SupportStatus.INCUBATING)
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
class SQLAlchemyGenericSource(SQLAlchemySource):
Expand Down
9 changes: 9 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class SchemaResolverInterface(Protocol):
def platform(self) -> str:
...

def includes_temp_tables(self) -> bool:
...

def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
...

Expand Down Expand Up @@ -74,6 +77,9 @@ def __init__(
def platform(self) -> str:
return self._platform

def includes_temp_tables(self) -> bool:
return False

def get_urns(self) -> Set[str]:
return set(k for k, v in self._schema_cache.items() if v is not None)

Expand Down Expand Up @@ -246,6 +252,9 @@ def __init__(
def platform(self) -> str:
return self._base_resolver.platform

def includes_temp_tables(self) -> bool:
return True

def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
urn = self._base_resolver.get_urn_for_table(
table, lower=self._base_resolver._prefers_urn_lower()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class QueryMetadata:
column_lineage: List[ColumnLineageInfo]
confidence_score: float

used_temp_tables: bool = True

def make_created_audit_stamp(self) -> models.AuditStampClass:
return models.AuditStampClass(
time=make_ts_millis(self.latest_timestamp) or 0,
Expand Down Expand Up @@ -166,6 +168,9 @@ class SqlAggregatorReport(Report):
queries_with_temp_upstreams: LossyDict[QueryId, LossyList] = dataclasses.field(
default_factory=LossyDict
)
queries_with_non_authoritative_session: LossyList[QueryId] = dataclasses.field(
default_factory=LossyList
)

# Lineage-related.
schema_resolver_count: Optional[int] = None
Expand Down Expand Up @@ -527,6 +532,7 @@ def add_observed_query(
schema_resolver: SchemaResolverInterface = (
self._make_schema_resolver_for_session(session_id)
)
session_has_temp_tables = schema_resolver.includes_temp_tables()

# Run the SQL parser.
parsed = self._run_sql_parser(
Expand Down Expand Up @@ -603,6 +609,7 @@ def add_observed_query(
upstreams=parsed.in_tables,
column_lineage=parsed.column_lineage or [],
confidence_score=parsed.debug_info.confidence,
used_temp_tables=session_has_temp_tables,
)
)

Expand Down Expand Up @@ -783,10 +790,21 @@ def _add_to_query_map(
# This assumes that queries come in order of increasing timestamps,
# so the current query is more authoritative than the previous one.
current.formatted_query_string = new.formatted_query_string
current.session_id = new.session_id
current.latest_timestamp = new.latest_timestamp or current.latest_timestamp
current.actor = new.actor or current.actor

if current.used_temp_tables and not new.used_temp_tables:
# If we see the same query again, but in a different session,
# it's possible that we didn't capture the temp tables in the newer session,
# but did in the older one. If that happens, we treat the older session's
# lineage as more authoritative. This isn't technically correct, but it's
# better than using the newer session's lineage, which is likely incorrect.
self.report.queries_with_non_authoritative_session.append(
query_fingerprint
)
return
current.session_id = new.session_id

if not merge_lineage:
# An invariant of the fingerprinting is that if two queries have the
# same fingerprint, they must also have the same lineage. We overwrite
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/redshift_query_mocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def mock_stl_insert_table_cursor(cursor: MagicMock) -> None:
"\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)["
"^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n "
" query_text,\n row_number() over (\n partition "
"by TRIM(query_text)\n order by start_time desc\n ) rn\n "
"by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n "
" from\n (\n select\n pid "
"as session_id,\n xid as transaction_id,\n starttime "
"as start_time,\n type,\n query_text,\n "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.customer,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
},
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
},
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_survey,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
},
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_638945c382e30206a8f8a57894d375e5f6f2a3562fe68480badf37e38e836d75"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),customer_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_638945c382e30206a8f8a57894d375e5f6f2a3562fe68480badf37e38e836d75"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),customer_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_id)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.customer,PROD),customer_email)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),customer_email)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD),return_date)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_638945c382e30206a8f8a57894d375e5f6f2a3562fe68480badf37e38e836d75"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD),return_date)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_date)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_survey,PROD),return_reason)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD),return_reason)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "CREATE TABLE #stage_online_returns AS\nSELECT\n online_ret.customer_id,\n customer.customer_email,\n online_ret.return_date,\n online_survey.return_reason\nFROM online_returns AS online_ret\nLEFT JOIN customer\n ON online_ret.customer_id = customer.customer_id\nLEFT JOIN online_survey\n ON online_ret.customer_id = online_survey.customer_id\n AND online_ret.return_id = online_survey.event_id;\n\nINSERT INTO all_returns (\n customer_id,\n customer_email,\n return_date,\n return_reason\n)\nSELECT\n customer_id,\n customer_email,\n return_date,\n return_reason\nFROM #stage_online_returns",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_ad747ecae933492280d24dfa7f3a4ae3a3c67457e145803d05f7d8bd7efa7d17",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.customer,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_returns,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.online_survey,PROD)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_638945c382e30206a8f8a57894d375e5f6f2a3562fe68480badf37e38e836d75",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "CREATE TABLE #stage_in_person_returns AS\nSELECT\n ipr.customer_id,\n customer.customer_email,\n ipr.return_date\nFROM in_person_returns AS ipr\nLEFT JOIN customer\n ON in_person_returns.customer_id = customer.customer_id;\n\nINSERT INTO all_returns (\n customer_id,\n customer_email,\n return_date\n)\nSELECT\n customer_id,\n customer_email,\n return_date\nFROM #stage_in_person_returns",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_638945c382e30206a8f8a57894d375e5f6f2a3562fe68480badf37e38e836d75",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.all_returns,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.customer,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.in_person_returns,PROD)"
}
]
}
}
}
]
Loading

0 comments on commit d4d175d

Please sign in to comment.