Skip to content

Commit

Permalink
fix(redshift) Properly handling database alias in redshift usage and …
Browse files Browse the repository at this point in the history
…redshift lineage generation (#4473)

* Fix database-alias in redshift usage and redshift lineage generation
  • Loading branch information
treff7es authored Mar 23, 2022
1 parent 4c1d049 commit 5c80177
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
14 changes: 8 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,6 @@ def _populate_lineage_map(

def _populate_lineage(self) -> None:

db_name = self.get_db_name()

stl_scan_based_lineage_query: str = """
select
distinct cluster,
Expand Down Expand Up @@ -736,7 +734,8 @@ def _populate_lineage(self) -> None:
scan_type in (1, 2, 3)
order by cluster, target_schema, target_table, starttime asc
""".format(
db_name=db_name,
# We need the original database name for filtering
db_name=self.config.database,
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
)
Expand Down Expand Up @@ -835,7 +834,8 @@ def _populate_lineage(self) -> None:
) as target_tables
order by cluster, target_schema, target_table, starttime asc
""".format(
db_name=db_name,
# We need the original database name for filtering
db_name=self.config.database,
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
)
Expand All @@ -858,7 +858,8 @@ def _populate_lineage(self) -> None:
and si.starttime < '{end_time}'
order by target_schema, target_table, starttime asc
""".format(
db_name=db_name,
# We need the original database name for filtering
db_name=self.config.database,
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
)
Expand Down Expand Up @@ -915,7 +916,8 @@ def get_lineage_mcp(
if dataset_key is None:
return None, None

if not self._lineage_map:
if self._lineage_map is None:
logger.debug("Populating lineage")
self._populate_lineage()
assert self._lineage_map is not None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{start_time}'
AND ss.starttime < '{end_time}'
AND sti.database = '{database}'
AND sq.aborted = 0
ORDER BY ss.endtime DESC;
""".strip()
Expand Down Expand Up @@ -175,6 +176,7 @@ def _make_usage_query(self, query: str) -> str:
return query.format(
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
database=self.config.database,
)

def _make_redshift_operation_aspect_query(self, table_name: str) -> str:
Expand Down Expand Up @@ -265,6 +267,9 @@ def _get_joined_access_event(self, events):
logging.info("An access event parameter(s) is missing. Skipping ....")
continue

if self.config.database_alias:
event_dict["database"] = self.config.database_alias

if not event_dict.get("usename") or event_dict["usename"] == "":
logging.info("The username parameter is missing. Skipping ....")
continue
Expand Down

0 comments on commit 5c80177

Please sign in to comment.