diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index a46bb035a256c3..a689e9ee642aef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -44,8 +44,7 @@ class Constant: TIME_STAMP = "time_stamp" STATUS = "status" USER_ID = "user_id" - GIVEN_NAME = "given_name" - FAMILY_NAME = "family_name" + EMAIL = "email" CONNECTOR_ID = "connector_id" CONNECTOR_NAME = "connector_name" CONNECTOR_TYPE_ID = "connector_type_id" diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py index 8f779e0cd6df22..4ae71b990e5cde 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py @@ -23,7 +23,7 @@ class Connector: paused: bool sync_frequency: int destination_id: str - user_name: Optional[str] + user_email: Optional[str] table_lineage: List[TableLineage] jobs: List["Job"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index b98db660b0ddc6..91b0101c10451b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -177,7 +177,7 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob: id=connector.connector_id, flow_urn=dataflow_urn, name=connector.connector_name, - owners={connector.user_name} if connector.user_name else set(), + owners={connector.user_email} if connector.user_email else set(), ) job_property_bag: Dict[str, str] = {} diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index af947150b0e3c6..96a4ec44ae67b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -156,7 +156,7 @@ def _get_jobs_list(self, connector_id: str) -> List[Job]: ) return jobs - def _get_user_name(self, user_id: Optional[str]) -> Optional[str]: + def _get_user_email(self, user_id: Optional[str]) -> Optional[str]: if not user_id: return None user_details = self._query( @@ -166,7 +166,7 @@ def _get_user_name(self, user_id: Optional[str]) -> Optional[str]: if not user_details: return None - return f"{user_details[0][Constant.GIVEN_NAME]} {user_details[0][Constant.FAMILY_NAME]}" + return f"{user_details[0][Constant.EMAIL]}" def get_allowed_connectors_list( self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport @@ -185,7 +185,7 @@ def get_allowed_connectors_list( paused=connector[Constant.PAUSED], sync_frequency=connector[Constant.SYNC_FREQUENCY], destination_id=connector[Constant.DESTINATION_ID], - user_name=self._get_user_name( + user_email=self._get_user_email( connector[Constant.CONNECTING_USER_ID] ), table_lineage=self._get_table_lineage( diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index df79b552ed980e..f1c818150c18f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -25,7 +25,8 @@ def get_user_query(self, user_id: str) -> str: return f""" SELECT id as user_id, given_name, - family_name + family_name, + email FROM {self.db_clause}user WHERE id = '{user_id}'""" diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_bigquery_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_bigquery_golden.json index ae9e71f0953f41..fcf354d7a14055 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_bigquery_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_bigquery_golden.json @@ -24,6 +24,7 @@ "aspect": { "json": { "owners": [], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:fivetran" @@ -187,13 +188,14 @@ "json": { "owners": [ { - "owner": "urn:li:corpuser:Shubham Jagtap", + "owner": "urn:li:corpuser:abc.xyz@email.com", "type": "DEVELOPER", "source": { "type": "SERVICE" } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:fivetran" diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index b8f05fa6e93aad..8545f433480755 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -24,6 +24,7 @@ "aspect": { "json": { "owners": [], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:fivetran" @@ -187,13 +188,14 @@ "json": { "owners": [ { - "owner": "urn:li:corpuser:Shubham Jagtap", + "owner": "urn:li:corpuser:abc.xyz@email.com", "type": "DEVELOPER", "source": { "type": "SERVICE" } } ], + "ownerTypes": {}, "lastModified": { "time": 0, "actor": "urn:li:corpuser:fivetran" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index b3c7820111ae25..dbfe1011a41fac 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -79,6 +79,7 @@ def default_query_results( "user_id": "reapply_phone", "given_name": "Shubham", "family_name": "Jagtap", + "email": "abc.xyz@email.com", } ] elif query == fivetran_log_query.get_sync_start_logs_query("calendar_elected"):