diff --git a/datahub-web-react/src/app/ingest/source/builder/constants.ts b/datahub-web-react/src/app/ingest/source/builder/constants.ts index dba8e8bb1dce6b..fdb094d721304b 100644 --- a/datahub-web-react/src/app/ingest/source/builder/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/constants.ts @@ -29,6 +29,7 @@ import databricksLogo from '../../../../images/databrickslogo.png'; import verticaLogo from '../../../../images/verticalogo.png'; import mlflowLogo from '../../../../images/mlflowlogo.png'; import dynamodbLogo from '../../../../images/dynamodblogo.png'; +import fivetranLogo from '../../../../images/fivetranlogo.png'; export const ATHENA = 'athena'; export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`; @@ -105,6 +106,8 @@ export const DBT_CLOUD = 'dbt-cloud'; export const DBT_CLOUD_URN = `urn:li:dataPlatform:dbt`; export const VERTICA = 'vertica'; export const VERTICA_URN = `urn:li:dataPlatform:${VERTICA}`; +export const FIVETRAN = 'fivetran'; +export const FIVETRAN_URN = `urn:li:dataPlatform:${FIVETRAN}`; export const PLATFORM_URN_TO_LOGO = { [ATHENA_URN]: athenaLogo, @@ -138,6 +141,7 @@ export const PLATFORM_URN_TO_LOGO = { [SUPERSET_URN]: supersetLogo, [UNITY_CATALOG_URN]: databricksLogo, [VERTICA_URN]: verticaLogo, + [FIVETRAN_URN]: fivetranLogo, }; export const SOURCE_TO_PLATFORM_URN = { diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index b18384909c33f0..9619abebbd54e6 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -216,6 +216,13 @@ "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/vertica/", "recipe": "source:\n type: vertica\n config:\n # Coordinates\n host_port: localhost:5433\n # The name of the vertica database\n database: Database_Name\n # Credentials\n username: Vertica_User\n password: Vertica_Password\n\n include_tables: true\n include_views: true\n include_projections: true\n include_models: true\n include_view_lineage: true\n include_projection_lineage: true\n profiling:\n enabled: false\n stateful_ingestion:\n enabled: true " }, + { + "urn": "urn:li:dataPlatform:fivetran", + "name": "fivetran", + "displayName": "Fivetran", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/fivetran/", + "recipe": "source:\n type: fivetran\n config:\n # Fivetran log connector destination server configurations\n fivetran_log_config:\n destination_platform: snowflake\n destination_config:\n # Coordinates\n account_id: snowflake_account_id\n warehouse: warehouse_name\n database: snowflake_db\n log_schema: fivetran_log_schema\n\n # Credentials\n username: ${SNOWFLAKE_USER}\n password: ${SNOWFLAKE_PASS}\n role: snowflake_role\n\n # Optional - filter for certain connector names instead of ingesting everything.\n # connector_patterns:\n # allow:\n # - connector_name\n\n # Optional -- This mapping is optional and only required to configure platform-instance for source\n # A mapping of Fivetran connector id to data platform instance\n # sources_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV\n\n # Optional -- This mapping is optional and only required to configure platform-instance for destination.\n # A mapping of Fivetran destination id to data platform instance\n # destination_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV" + }, { "urn": "urn:li:dataPlatform:custom", "name": "custom", diff --git a/datahub-web-react/src/images/fivetranlogo.png b/datahub-web-react/src/images/fivetranlogo.png new file mode 100644 index 00000000000000..d5c999ad2d86e9 Binary files /dev/null and b/datahub-web-react/src/images/fivetranlogo.png differ diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md new file mode 100644 index 00000000000000..949e408215e6f4 --- /dev/null +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md @@ -0,0 +1,86 @@ +## Integration Details + +This source extracts the following: + +- Connectors in fivetran as Data Pipelines and Data Jobs to represent data lineage information between source and destination. +- Connector sources - DataJob input Datasets. +- Connector destination - DataJob output Datasets. +- Connector runs - DataProcessInstances as DataJob runs. + +## Configuration Notes + +1. Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination. +2. You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer [link](https://fivetran.com/docs/logs/fivetran-platform/setup-guide). +3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe. + +## Concept mapping + +| Fivetran | Datahub | +|--------------------------|--------------------------------------------------------------------------------------------------------| +| `Connector` | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) | +| `Source` | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) | +| `Destination` | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) | +| `Connector Run` | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) | + +Source and destination are mapped to Dataset as an Input and Output of Connector. + +## Current limitations + +Works only for Snowflake destination for now. + +## Snowflake destination Configuration Guide +1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata. +2. Snowflake system admin can follow this guide to create a fivetran_datahub role, assign it the required privileges, and assign it to a user by executing the following Snowflake commands from a user with the ACCOUNTADMIN role or MANAGE GRANTS privilege. + +```sql +create or replace role fivetran_datahub; + +// Grant access to a warehouse to run queries to view metadata +grant operate, usage on warehouse "" to role fivetran_datahub; + +// Grant access to view database and schema in which your log and metadata tables exist +grant usage on DATABASE "" to role fivetran_datahub; +grant usage on SCHEMA ""."" to role fivetran_datahub; + +// Grant access to execute select query on schema in which your log and metadata tables exist +grant select on all tables in SCHEMA ""."" to role fivetran_datahub; + +// Grant the fivetran_datahub to the snowflake user. +grant role fivetran_datahub to user snowflake_user; +``` + +## Advanced Configurations + +### Working with Platform Instances +If you've multiple instances of source/destination systems that are referred in your `fivetran` setup, you'd need to configure platform instance for these systems in `fivetran` recipe to generate correct lineage edges. Refer the document [Working with Platform Instances](https://datahubproject.io/docs/platform-instances) to understand more about this. + +While configuration of platform instance for source system you need to provide connector id as key and for destination system provide destination id as key. + +#### Example - Multiple Postgres Source Connectors each reading from different postgres instance +```yml + # Map of connector source to platform instance + sources_to_platform_instance: + postgres_connector_id1: + platform_instance: cloud_postgres_instance + env: PROD + + postgres_connector_id2: + platform_instance: local_postgres_instance + env: DEV +``` + +#### Example - Multiple Snowflake Destinations each writing to different snowflake instance +```yml + # Map of destination to platform instance + destination_to_platform_instance: + snowflake_destination_id1: + platform_instance: prod_snowflake_instance + env: PROD + + snowflake_destination_id2: + platform_instance: dev_snowflake_instance + env: PROD +``` + + + diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml b/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml new file mode 100644 index 00000000000000..7c654df59723c1 --- /dev/null +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml @@ -0,0 +1,43 @@ +source: + type: fivetran + config: + # Fivetran log connector destination server configurations + fivetran_log_config: + destination_platform: snowflake + destination_config: + # Coordinates + account_id: "abc48144" + warehouse: "COMPUTE_WH" + database: "MY_SNOWFLAKE_DB" + log_schema: "FIVETRAN_LOG" + + # Credentials + username: "${SNOWFLAKE_USER}" + password: "${SNOWFLAKE_PASS}" + role: "snowflake_role" + + # Optional - filter for certain connector names instead of ingesting everything. + # connector_patterns: + # allow: + # - connector_name + + # Optional -- A mapping of the connector's all sources to its database. + # sources_to_database: + # connector_id: source_db + + # Optional -- This mapping is optional and only required to configure platform-instance for source + # A mapping of Fivetran connector id to data platform instance + # sources_to_platform_instance: + # connector_id: + # platform_instance: cloud_instance + # env: DEV + + # Optional -- This mapping is optional and only required to configure platform-instance for destination. + # A mapping of Fivetran destination id to data platform instance + # destination_to_platform_instance: + # destination_id: + # platform_instance: cloud_instance + # env: DEV + +sink: + # sink configs diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index afce8dcee840b4..2392fce0580613 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -395,6 +395,7 @@ "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"}, "unity-catalog": databricks | sqllineage_lib, + "fivetran": snowflake_common, } # This is mainly used to exclude plugins from the Docker image. @@ -525,6 +526,7 @@ "nifi", "vertica", "mode", + "fivetran", "kafka-connect", ] if plugin @@ -629,6 +631,7 @@ "unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource", "gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource", "sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource", + "fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource", ], "datahub.ingestion.transformer.plugins": [ "simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership", diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 0face6415bacc4..6c42e830e223b1 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -100,7 +100,9 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]: ) return [tags] - def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + def generate_mcp( + self, materialize_iolets: bool = True + ) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataJobInfoClass( @@ -113,7 +115,9 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: ) yield mcp - yield from self.generate_data_input_output_mcp() + yield from self.generate_data_input_output_mcp( + materialize_iolets=materialize_iolets + ) for owner in self.generate_ownership_aspect(): mcp = MetadataChangeProposalWrapper( @@ -144,7 +148,9 @@ def emit( for mcp in self.generate_mcp(): emitter.emit(mcp, callback) - def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + def generate_data_input_output_mcp( + self, materialize_iolets: bool + ) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataJobInputOutputClass( @@ -157,10 +163,9 @@ def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapp yield mcp # Force entity materialization - for iolet in self.inlets + self.outlets: - mcp = MetadataChangeProposalWrapper( - entityUrn=str(iolet), - aspect=StatusClass(removed=False), - ) - - yield mcp + if materialize_iolets: + for iolet in self.inlets + self.outlets: + yield MetadataChangeProposalWrapper( + entityUrn=str(iolet), + aspect=StatusClass(removed=False), + ) diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index cf6080c7072e69..2f07e4a112f934 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -220,12 +220,10 @@ def emit_process_end( self._emit_mcp(mcp, emitter, callback) def generate_mcp( - self, created_ts_millis: Optional[int] = None + self, created_ts_millis: Optional[int] = None, materialize_iolets: bool = True ) -> Iterable[MetadataChangeProposalWrapper]: - """ - Generates mcps from the object - :rtype: Iterable[MetadataChangeProposalWrapper] - """ + """Generates mcps from the object""" + mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataProcessInstanceProperties( @@ -253,7 +251,7 @@ def generate_mcp( ) yield mcp - yield from self.generate_inlet_outlet_mcp() + yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets) @staticmethod def _emit_mcp( @@ -329,7 +327,9 @@ def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance": dpi._template_object = dataflow return dpi - def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + def generate_inlet_outlet_mcp( + self, materialize_iolets: bool + ) -> Iterable[MetadataChangeProposalWrapper]: if self.inlets: mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), @@ -349,10 +349,9 @@ def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: yield mcp # Force entity materialization - for iolet in self.inlets + self.outlets: - mcp = MetadataChangeProposalWrapper( - entityUrn=str(iolet), - aspect=StatusClass(removed=False), - ) - - yield mcp + if materialize_iolets: + for iolet in self.inlets + self.outlets: + yield MetadataChangeProposalWrapper( + entityUrn=str(iolet), + aspect=StatusClass(removed=False), + ) diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 9085ac152ea0b2..d6aa695665e4ef 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -240,7 +240,7 @@ def from_obj_require_wrapper( return mcp def as_workunit( - self, *, treat_errors_as_warnings: bool = False + self, *, treat_errors_as_warnings: bool = False, is_primary_source: bool = True ) -> "MetadataWorkUnit": from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -254,10 +254,12 @@ def as_workunit( id=f"{self.entityUrn}-{self.aspectName}-{ts}", mcp=self, treat_errors_as_warnings=treat_errors_as_warnings, + is_primary_source=is_primary_source, ) return MetadataWorkUnit( id=f"{self.entityUrn}-{self.aspectName}", mcp=self, treat_errors_as_warnings=treat_errors_as_warnings, + is_primary_source=is_primary_source, ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 2ce9e07bc57bc8..fae260226195ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -17,6 +17,7 @@ from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( BrowsePathEntryClass, @@ -64,9 +65,9 @@ def auto_status_aspect( """ For all entities that don't have a status aspect, add one with removed set to false. """ - all_urns: Set[str] = set() status_urns: Set[str] = set() + skip_urns: Set[str] = set() for wu in stream: urn = wu.get_urn() all_urns.add(urn) @@ -89,9 +90,17 @@ def auto_status_aspect( else: raise ValueError(f"Unexpected type {type(wu.metadata)}") + if not isinstance( + wu.metadata, MetadataChangeEventClass + ) and not entity_supports_aspect(wu.metadata.entityType, StatusClass): + # If any entity does not support aspect 'status' then skip that entity from adding status aspect. + # Example like dataProcessInstance doesn't suppport status aspect. + # If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance + skip_urns.add(urn) + yield wu - for urn in sorted(all_urns - status_urns): + for urn in sorted(all_urns - status_urns - skip_urns): yield MetadataChangeProposalWrapper( entityUrn=urn, aspect=StatusClass(removed=False), diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py new file mode 100644 index 00000000000000..b0843182c5cac4 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -0,0 +1,145 @@ +import logging +from dataclasses import dataclass, field as dataclass_field +from typing import Dict, List, Optional + +import pydantic +from pydantic import Field, root_validator + +from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfigBase, +) +from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig + +logger = logging.getLogger(__name__) + + +class Constant: + """ + keys used in fivetran plugin + """ + + ORCHESTRATOR = "fivetran" + # table column name + SOURCE_SCHEMA_NAME = "source_schema_name" + SOURCE_TABLE_NAME = "source_table_name" + SOURCE_TABLE_ID = "source_table_id" + SOURCE_COLUMN_NAME = "source_column_name" + DESTINATION_SCHEMA_NAME = "destination_schema_name" + DESTINATION_TABLE_NAME = "destination_table_name" + DESTINATION_TABLE_ID = "destination_table_id" + DESTINATION_COLUMN_NAME = "destination_column_name" + SYNC_ID = "sync_id" + MESSAGE_DATA = "message_data" + TIME_STAMP = "time_stamp" + STATUS = "status" + USER_ID = "user_id" + GIVEN_NAME = "given_name" + FAMILY_NAME = "family_name" + CONNECTOR_ID = "connector_id" + CONNECTOR_NAME = "connector_name" + CONNECTOR_TYPE_ID = "connector_type_id" + PAUSED = "paused" + SYNC_FREQUENCY = "sync_frequency" + DESTINATION_ID = "destination_id" + CONNECTING_USER_ID = "connecting_user_id" + # Job status constants + SUCCESSFUL = "SUCCESSFUL" + FAILURE_WITH_TASK = "FAILURE_WITH_TASK" + CANCELED = "CANCELED" + + +KNOWN_DATA_PLATFORM_MAPPING = { + "postgres": "postgres", + "snowflake": "snowflake", +} + + +class DestinationConfig(BaseSnowflakeConfig): + database: str = Field(description="The fivetran connector log database.") + log_schema: str = Field(description="The fivetran connector log schema.") + + +class FivetranLogConfig(ConfigModel): + destination_platform: str = pydantic.Field( + default="snowflake", + description="The destination platform where fivetran connector log tables are dumped.", + ) + destination_config: Optional[DestinationConfig] = pydantic.Field( + default=None, + description="If destination platform is 'snowflake', provide snowflake configuration.", + ) + + @root_validator(pre=True) + def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict: + destination_platform = values["destination_platform"] + if destination_platform == "snowflake": + if "destination_config" not in values: + raise ValueError( + "If destination platform is 'snowflake', user must provide snowflake destination configuration in the recipe." + ) + else: + raise ValueError( + f"Destination platform '{destination_platform}' is not yet supported." + ) + return values + + +@dataclass +class FivetranSourceReport(StaleEntityRemovalSourceReport): + connectors_scanned: int = 0 + filtered_connectors: List[str] = dataclass_field(default_factory=list) + + def report_connectors_scanned(self, count: int = 1) -> None: + self.connectors_scanned += count + + def report_connectors_dropped(self, model: str) -> None: + self.filtered_connectors.append(model) + + +class PlatformDetail(ConfigModel): + platform_instance: Optional[str] = pydantic.Field( + default=None, + description="The instance of the platform that all assets produced by this recipe belong to", + ) + env: str = pydantic.Field( + default=DEFAULT_ENV, + description="The environment that all assets produced by DataHub platform ingestion source belong to", + ) + + +class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): + fivetran_log_config: FivetranLogConfig = pydantic.Field( + description="Fivetran log connector destination server configurations.", + ) + connector_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for connectors to filter in ingestion.", + ) + include_column_lineage: bool = Field( + default=True, + description="Populates table->table column lineage.", + ) + sources_to_database: Dict[str, str] = pydantic.Field( + default={}, + description="A mapping of the connector's all sources to its database. Use connector id as key.", + ) + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( + default=None, description="Airbyte Stateful Ingestion Config." + ) + # Fivetran connector all sources to platform instance mapping + sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field( + default={}, + description="A mapping of the connector's all sources dataset to platform instance. Use connector id as key.", + ) + # Fivetran destination to platform instance mapping + destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field( + default={}, + description="A mapping of destination dataset to platform instance. Use destination id as key.", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py new file mode 100644 index 00000000000000..82bb5f3467c2a6 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import List + + +@dataclass +class ColumnLineage: + source_column: str + destination_column: str + + +@dataclass +class TableLineage: + source_table: str + destination_table: str + column_lineage: List[ColumnLineage] + + +@dataclass +class Connector: + connector_id: str + connector_name: str + connector_type: str + paused: bool + sync_frequency: int + destination_id: str + user_name: str + table_lineage: List[TableLineage] + jobs: List["Job"] + + +@dataclass +class Job: + job_id: str + start_time: int + end_time: int + status: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py new file mode 100644 index 00000000000000..c0395b4e4e7963 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -0,0 +1,289 @@ +import logging +from typing import Dict, Iterable, List, Optional + +import datahub.emitter.mce_builder as builder +from datahub.api.entities.datajob import DataFlow, DataJob +from datahub.api.entities.dataprocess.dataprocess_instance import ( + DataProcessInstance, + InstanceRunResult, +) +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.fivetran.config import ( + KNOWN_DATA_PLATFORM_MAPPING, + Constant, + FivetranSourceConfig, + FivetranSourceReport, + PlatformDetail, +) +from datahub.ingestion.source.fivetran.data_classes import Connector, Job +from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionSourceBase, +) +from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( + FineGrainedLineage, + FineGrainedLineageDownstreamType, + FineGrainedLineageUpstreamType, +) +from datahub.metadata.schema_classes import StatusClass +from datahub.utilities.urns.data_flow_urn import DataFlowUrn +from datahub.utilities.urns.dataset_urn import DatasetUrn + +# Logger instance +logger = logging.getLogger(__name__) + + +@platform_name("Fivetran") +@config_class(FivetranSourceConfig) +@support_status(SupportStatus.INCUBATING) +@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability( + SourceCapability.LINEAGE_FINE, + "Enabled by default, can be disabled via configuration `include_column_lineage`", +) +class FivetranSource(StatefulIngestionSourceBase): + """ + This plugin extracts fivetran users, connectors, destinations and sync history. + This plugin is in beta and has only been tested on Snowflake connector. + """ + + config: FivetranSourceConfig + report: FivetranSourceReport + platform: str = "fivetran" + + def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): + super(FivetranSource, self).__init__(config, ctx) + self.config = config + self.report = FivetranSourceReport() + + self.audit_log = FivetranLogAPI(self.config.fivetran_log_config) + + # Create and register the stateful ingestion use-case handler. + self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( + self, self.config, self.ctx + ) + + def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: + input_dataset_urn_list: List[DatasetUrn] = [] + output_dataset_urn_list: List[DatasetUrn] = [] + fine_grained_lineage: List[FineGrainedLineage] = [] + + source_platform_detail: PlatformDetail = PlatformDetail() + destination_platform_detail: PlatformDetail = PlatformDetail() + # Get platform details for connector source + source_platform_detail = self.config.sources_to_platform_instance.get( + connector.connector_id, PlatformDetail() + ) + + # Get platform details for destination + destination_platform_detail = self.config.destination_to_platform_instance.get( + connector.destination_id, PlatformDetail() + ) + + # Get database for connector source + # TODO: Once Fivetran exposes this, we shouldn't ask for it via config. + source_database: Optional[str] = self.config.sources_to_database.get( + connector.connector_id + ) + + if connector.connector_type in KNOWN_DATA_PLATFORM_MAPPING: + source_platform = KNOWN_DATA_PLATFORM_MAPPING[connector.connector_type] + else: + source_platform = connector.connector_type + logger.info( + f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity." + ) + + for table_lineage in connector.table_lineage: + input_dataset_urn = DatasetUrn.create_from_ids( + platform_id=source_platform, + table_name=f"{source_database.lower()}.{table_lineage.source_table}" + if source_database + else table_lineage.source_table, + env=source_platform_detail.env, + platform_instance=source_platform_detail.platform_instance, + ) + input_dataset_urn_list.append(input_dataset_urn) + + output_dataset_urn: Optional[DatasetUrn] = None + if self.audit_log.fivetran_log_database: + output_dataset_urn = DatasetUrn.create_from_ids( + platform_id=self.config.fivetran_log_config.destination_platform, + table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}", + env=destination_platform_detail.env, + platform_instance=destination_platform_detail.platform_instance, + ) + output_dataset_urn_list.append(output_dataset_urn) + + if self.config.include_column_lineage: + for column_lineage in table_lineage.column_lineage: + fine_grained_lineage.append( + FineGrainedLineage( + upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, + upstreams=[ + builder.make_schema_field_urn( + str(input_dataset_urn), + column_lineage.source_column, + ) + ] + if input_dataset_urn + else [], + downstreamType=FineGrainedLineageDownstreamType.FIELD, + downstreams=[ + builder.make_schema_field_urn( + str(output_dataset_urn), + column_lineage.destination_column, + ) + ] + if output_dataset_urn + else [], + ) + ) + + datajob.inlets.extend(input_dataset_urn_list) + datajob.outlets.extend(output_dataset_urn_list) + datajob.fine_grained_lineages.extend(fine_grained_lineage) + return None + + def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow: + return DataFlow( + orchestrator=Constant.ORCHESTRATOR, + id=connector.connector_id, + env=self.config.env, + name=connector.connector_name, + platform_instance=self.config.platform_instance, + ) + + def _generate_datajob_from_connector(self, connector: Connector) -> DataJob: + dataflow_urn = DataFlowUrn.create_from_ids( + orchestrator=Constant.ORCHESTRATOR, + flow_id=connector.connector_id, + env=self.config.env, + platform_instance=self.config.platform_instance, + ) + datajob = DataJob( + id=connector.connector_id, + flow_urn=dataflow_urn, + name=connector.connector_name, + owners={connector.user_name}, + ) + + job_property_bag: Dict[str, str] = {} + allowed_connection_keys = [ + Constant.PAUSED, + Constant.SYNC_FREQUENCY, + Constant.DESTINATION_ID, + ] + for key in allowed_connection_keys: + if hasattr(connector, key) and getattr(connector, key) is not None: + job_property_bag[key] = repr(getattr(connector, key)) + datajob.properties = job_property_bag + + # Map connector source and destination table with dataset entity + # Also extend the fine grained lineage of column if include_column_lineage is True + self._extend_lineage(connector=connector, datajob=datajob) + + # TODO: Add fine grained lineages of dataset after FineGrainedLineageDownstreamType.DATASET enabled + + return datajob + + def _generate_dpi_from_job(self, job: Job, datajob: DataJob) -> DataProcessInstance: + return DataProcessInstance.from_datajob( + datajob=datajob, + id=job.job_id, + clone_inlets=True, + clone_outlets=True, + ) + + def _get_dpi_workunits( + self, job: Job, dpi: DataProcessInstance + ) -> Iterable[MetadataWorkUnit]: + status_result_map: Dict[str, InstanceRunResult] = { + Constant.SUCCESSFUL: InstanceRunResult.SUCCESS, + Constant.FAILURE_WITH_TASK: InstanceRunResult.FAILURE, + Constant.CANCELED: InstanceRunResult.SKIPPED, + } + if job.status not in status_result_map: + logger.debug( + f"Status should be either SUCCESSFUL, FAILURE_WITH_TASK or CANCELED and it was " + f"{job.status}" + ) + return [] + result = status_result_map[job.status] + start_timestamp_millis = job.start_time * 1000 + for mcp in dpi.generate_mcp( + created_ts_millis=start_timestamp_millis, materialize_iolets=False + ): + yield mcp.as_workunit() + for mcp in dpi.start_event_mcp(start_timestamp_millis): + yield mcp.as_workunit() + for mcp in dpi.end_event_mcp( + end_timestamp_millis=job.end_time * 1000, + result=result, + result_type=Constant.ORCHESTRATOR, + ): + yield mcp.as_workunit() + + def _get_connector_workunits( + self, connector: Connector + ) -> Iterable[MetadataWorkUnit]: + self.report.report_connectors_scanned() + # Create dataflow entity with same name as connector name + dataflow = self._generate_dataflow_from_connector(connector) + for mcp in dataflow.generate_mcp(): + yield mcp.as_workunit() + + # Map Fivetran's connector entity with Datahub's datajob entity + datajob = self._generate_datajob_from_connector(connector) + for mcp in datajob.generate_mcp(materialize_iolets=True): + if mcp.entityType == "dataset" and isinstance(mcp.aspect, StatusClass): + # While we "materialize" the referenced datasets, we don't want them + # to be tracked by stateful ingestion. + yield mcp.as_workunit(is_primary_source=False) + else: + yield mcp.as_workunit() + + # Map Fivetran's job/sync history entity with Datahub's data process entity + for job in connector.jobs: + dpi = self._generate_dpi_from_job(job, datajob) + yield from self._get_dpi_workunits(job, dpi) + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: + config = FivetranSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: + return [ + *super().get_workunit_processors(), + self.stale_entity_removal_handler.workunit_processor, + ] + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + """ + Datahub Ingestion framework invoke this method + """ + logger.info("Fivetran plugin execution is started") + connectors = self.audit_log.get_connectors_list() + for connector in connectors: + if not self.config.connector_patterns.allowed(connector.connector_name): + self.report.report_connectors_dropped(connector.connector_name) + continue + logger.info(f"Processing connector id: {connector.connector_id}") + yield from self._get_connector_workunits(connector) + + def get_report(self) -> SourceReport: + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py new file mode 100644 index 00000000000000..d5d146559d9183 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -0,0 +1,147 @@ +import json +import logging +from typing import Any, Dict, List, Optional + +from sqlalchemy import create_engine + +from datahub.ingestion.source.fivetran.config import Constant, FivetranLogConfig +from datahub.ingestion.source.fivetran.data_classes import ( + ColumnLineage, + Connector, + Job, + TableLineage, +) +from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery + +logger: logging.Logger = logging.getLogger(__name__) + + +class FivetranLogAPI: + def __init__(self, fivetran_log_config: FivetranLogConfig) -> None: + self.fivetran_log_database: Optional[str] = None + self.fivetran_log_config = fivetran_log_config + self.engine = self._get_log_destination_engine() + + def _get_log_destination_engine(self) -> Any: + destination_platform = self.fivetran_log_config.destination_platform + engine = None + # For every destination, create sqlalchemy engine, + # select the database and schema and set fivetran_log_database class variable + if destination_platform == "snowflake": + snowflake_destination_config = self.fivetran_log_config.destination_config + if snowflake_destination_config is not None: + engine = create_engine( + snowflake_destination_config.get_sql_alchemy_url(), + **snowflake_destination_config.get_options(), + ) + engine.execute( + FivetranLogQuery.use_schema( + snowflake_destination_config.database, + snowflake_destination_config.log_schema, + ) + ) + self.fivetran_log_database = snowflake_destination_config.database + return engine + + def _query(self, query: str) -> List[Dict]: + logger.debug("Query : {}".format(query)) + resp = self.engine.execute(query) + return [row for row in resp] + + def _get_table_lineage(self, connector_id: str) -> List[TableLineage]: + table_lineage_result = self._query( + FivetranLogQuery.get_table_lineage_query(connector_id=connector_id) + ) + table_lineage_list: List[TableLineage] = [] + for table_lineage in table_lineage_result: + column_lineage_result = self._query( + FivetranLogQuery.get_column_lineage_query( + source_table_id=table_lineage[Constant.SOURCE_TABLE_ID], + destination_table_id=table_lineage[Constant.DESTINATION_TABLE_ID], + ) + ) + column_lineage_list: List[ColumnLineage] = [ + ColumnLineage( + source_column=column_lineage[Constant.SOURCE_COLUMN_NAME], + destination_column=column_lineage[Constant.DESTINATION_COLUMN_NAME], + ) + for column_lineage in column_lineage_result + ] + table_lineage_list.append( + TableLineage( + source_table=f"{table_lineage[Constant.SOURCE_SCHEMA_NAME]}.{table_lineage[Constant.SOURCE_TABLE_NAME]}", + destination_table=f"{table_lineage[Constant.DESTINATION_SCHEMA_NAME]}.{table_lineage[Constant.DESTINATION_TABLE_NAME]}", + column_lineage=column_lineage_list, + ) + ) + + return table_lineage_list + + def _get_jobs_list(self, connector_id: str) -> List[Job]: + jobs: List[Job] = [] + sync_start_logs = { + row[Constant.SYNC_ID]: row + for row in self._query( + FivetranLogQuery.get_sync_start_logs_query(connector_id=connector_id) + ) + } + sync_end_logs = { + row[Constant.SYNC_ID]: row + for row in self._query( + FivetranLogQuery.get_sync_end_logs_query(connector_id=connector_id) + ) + } + for sync_id in sync_start_logs.keys(): + if sync_end_logs.get(sync_id) is None: + # If no sync-end event log for this sync id that means sync is still in progress + continue + + message_data = json.loads(sync_end_logs[sync_id][Constant.MESSAGE_DATA]) + if isinstance(message_data, str): + # Sometimes message_data contains json string inside string + # Ex: '"{\"status\":\"SUCCESSFUL\"}"' + # Hence, need to do json loads twice. + message_data = json.loads(message_data) + + jobs.append( + Job( + job_id=sync_id, + start_time=round( + sync_start_logs[sync_id][Constant.TIME_STAMP].timestamp() + ), + end_time=round( + sync_end_logs[sync_id][Constant.TIME_STAMP].timestamp() + ), + status=message_data[Constant.STATUS], + ) + ) + return jobs + + def _get_user_name(self, user_id: str) -> str: + user_details = self._query(FivetranLogQuery.get_user_query(user_id=user_id))[0] + return ( + f"{user_details[Constant.GIVEN_NAME]} {user_details[Constant.FAMILY_NAME]}" + ) + + def get_connectors_list(self) -> List[Connector]: + connectors: List[Connector] = [] + connector_list = self._query(FivetranLogQuery.get_connectors_query()) + for connector in connector_list: + connectors.append( + Connector( + connector_id=connector[Constant.CONNECTOR_ID], + connector_name=connector[Constant.CONNECTOR_NAME], + connector_type=connector[Constant.CONNECTOR_TYPE_ID], + paused=connector[Constant.PAUSED], + sync_frequency=connector[Constant.SYNC_FREQUENCY], + destination_id=connector[Constant.DESTINATION_ID], + user_name=self._get_user_name( + connector[Constant.CONNECTING_USER_ID] + ), + table_lineage=self._get_table_lineage( + connector[Constant.CONNECTOR_ID] + ), + jobs=self._get_jobs_list(connector[Constant.CONNECTOR_ID]), + ) + ) + return connectors diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py new file mode 100644 index 00000000000000..4f52fcd5d884fb --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -0,0 +1,76 @@ +class FivetranLogQuery: + @staticmethod + def use_schema(db_name: str, schema_name: str) -> str: + return f'use schema "{db_name}"."{schema_name}"' + + @staticmethod + def get_connectors_query() -> str: + return """ + SELECT connector_id as "CONNECTOR_ID", + connecting_user_id as "CONNECTING_USER_ID", + connector_type_id as "CONNECTOR_TYPE_ID", + connector_name as "CONNECTOR_NAME", + paused as "PAUSED", + sync_frequency as "SYNC_FREQUENCY", + destination_id as "DESTINATION_ID" + FROM CONNECTOR + WHERE _fivetran_deleted = FALSE""" + + @staticmethod + def get_user_query(user_id: str) -> str: + return f""" + SELECT id as "USER_ID", + given_name as "GIVEN_NAME", + family_name as "FAMILY_NAME" + FROM USER + WHERE id = '{user_id}'""" + + @staticmethod + def get_sync_start_logs_query( + connector_id: str, + ) -> str: + return f""" + SELECT time_stamp as "TIME_STAMP", + sync_id as "SYNC_ID" + FROM LOG + WHERE message_event = 'sync_start' + and connector_id = '{connector_id}' order by time_stamp""" + + @staticmethod + def get_sync_end_logs_query(connector_id: str) -> str: + return f""" + SELECT time_stamp as "TIME_STAMP", + sync_id as "SYNC_ID", + message_data as "MESSAGE_DATA" + FROM LOG + WHERE message_event = 'sync_end' + and connector_id = '{connector_id}' order by time_stamp""" + + @staticmethod + def get_table_lineage_query(connector_id: str) -> str: + return f""" + SELECT stm.id as "SOURCE_TABLE_ID", + stm.name as "SOURCE_TABLE_NAME", + ssm.name as "SOURCE_SCHEMA_NAME", + dtm.id as "DESTINATION_TABLE_ID", + dtm.name as "DESTINATION_TABLE_NAME", + dsm.name as "DESTINATION_SCHEMA_NAME" + FROM table_lineage as tl + JOIN source_table_metadata as stm on tl.source_table_id = stm.id + JOIN destination_table_metadata as dtm on tl.destination_table_id = dtm.id + JOIN source_schema_metadata as ssm on stm.schema_id = ssm.id + JOIN destination_schema_metadata as dsm on dtm.schema_id = dsm.id + WHERE stm.connector_id = '{connector_id}'""" + + @staticmethod + def get_column_lineage_query( + source_table_id: str, destination_table_id: str + ) -> str: + return f""" + SELECT scm.name as "SOURCE_COLUMN_NAME", + dcm.name as "DESTINATION_COLUMN_NAME" + FROM column_lineage as cl + JOIN source_column_metadata as scm on + (cl.source_column_id = scm.id and scm.table_id = {source_table_id}) + JOIN destination_column_metadata as dcm on + (cl.destination_column_id = dcm.id and dcm.table_id = {destination_table_id})""" diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py index c3e8c175f1de54..9fc697018ecd6b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py @@ -12,7 +12,7 @@ OAUTH_AUTHENTICATOR, ) -from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.oauth import OAuthConfiguration, OAuthIdentityProvider from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.configuration.validate_field_rename import pydantic_renamed_field @@ -42,9 +42,14 @@ SNOWFLAKE_HOST_SUFFIX = ".snowflakecomputing.com" -class BaseSnowflakeConfig(BaseTimeWindowConfig): +class BaseSnowflakeConfig(ConfigModel): # Note: this config model is also used by the snowflake-usage source. + options: dict = pydantic.Field( + default_factory=dict, + description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.", + ) + scheme: str = "snowflake" username: Optional[str] = pydantic.Field( default=None, description="Snowflake username." @@ -82,14 +87,6 @@ class BaseSnowflakeConfig(BaseTimeWindowConfig): default=None, description="Snowflake warehouse." ) role: Optional[str] = pydantic.Field(default=None, description="Snowflake role.") - include_table_lineage: bool = pydantic.Field( - default=True, - description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.", - ) - include_view_lineage: bool = pydantic.Field( - default=True, - description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.", - ) connect_args: Optional[Dict[str, Any]] = pydantic.Field( default=None, description="Connect args to pass to Snowflake SqlAlchemy driver", @@ -166,18 +163,6 @@ def _check_oauth_config(oauth_config: Optional[OAuthConfiguration]) -> None: "but should be set when using use_certificate false for oauth_config" ) - @pydantic.root_validator() - def validate_include_view_lineage(cls, values): - if ( - "include_table_lineage" in values - and not values.get("include_table_lineage") - and values.get("include_view_lineage") - ): - raise ValueError( - "include_table_lineage must be True for include_view_lineage to be set." - ) - return values - def get_sql_alchemy_url( self, database: Optional[str] = None, @@ -261,28 +246,8 @@ def get_connect_args(self) -> dict: self._computed_connect_args = connect_args return connect_args - -class SnowflakeConfig(BaseSnowflakeConfig, SQLCommonConfig): - database_pattern: AllowDenyPattern = AllowDenyPattern( - deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] - ) - - ignore_start_time_lineage: bool = False - upstream_lineage_in_report: bool = False - - def get_sql_alchemy_url( - self, - database: Optional[str] = None, - username: Optional[str] = None, - password: Optional[pydantic.SecretStr] = None, - role: Optional[str] = None, - ) -> str: - return super().get_sql_alchemy_url( - database=database, username=username, password=password, role=role - ) - def get_options(self) -> dict: - options_connect_args: Dict = super().get_connect_args() + options_connect_args: Dict = self.get_connect_args() options_connect_args.update(self.options.get("connect_args", {})) self.options["connect_args"] = options_connect_args return self.options @@ -372,3 +337,34 @@ def get_connection(self) -> snowflake.connector.SnowflakeConnection: else: # not expected to be here raise Exception("Not expected to be here.") + + +class SnowflakeConfig(BaseSnowflakeConfig, BaseTimeWindowConfig, SQLCommonConfig): + + include_table_lineage: bool = pydantic.Field( + default=True, + description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.", + ) + include_view_lineage: bool = pydantic.Field( + default=True, + description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.", + ) + + database_pattern: AllowDenyPattern = AllowDenyPattern( + deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] + ) + + ignore_start_time_lineage: bool = False + upstream_lineage_in_report: bool = False + + @pydantic.root_validator() + def validate_include_view_lineage(cls, values): + if ( + "include_table_lineage" in values + and not values.get("include_table_lineage") + and values.get("include_view_lineage") + ): + raise ValueError( + "include_table_lineage must be True for include_view_lineage to be set." + ) + return values diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json new file mode 100644 index 00000000000000..a72c960a722969 --- /dev/null +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_golden.json @@ -0,0 +1,658 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "postgres" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "paused": "False", + "sync_frequency": "1440", + "destination_id": "'interval_unconstitutional'" + }, + "name": "postgres", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD),name)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:Shubham Jagtap", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "4c9a03d6-eded-4422-a46a-163266e58243", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1695191853000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191853000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191885000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1696343730000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343730000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343732000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "SKIPPED", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "63c2fc85-600b-455f-9ba0-f576522465be", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1696343755000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.postgres_public.company,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343755000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343790000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "FAILURE", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py new file mode 100644 index 00000000000000..62b3df12e1b9d3 --- /dev/null +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -0,0 +1,192 @@ +import datetime +from unittest import mock +from unittest.mock import MagicMock + +import pytest +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.fivetran.config import DestinationConfig +from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery +from tests.test_helpers import mce_helpers + +FROZEN_TIME = "2022-06-07 17:00:00" + + +def default_query_results(query): + if query == FivetranLogQuery.use_schema("TEST_DATABASE", "TEST_SCHEMA"): + return [] + elif query == FivetranLogQuery.get_connectors_query(): + return [ + { + "connector_id": "calendar_elected", + "connecting_user_id": "reapply_phone", + "connector_type_id": "postgres", + "connector_name": "postgres", + "paused": False, + "sync_frequency": 1440, + "destination_id": "interval_unconstitutional", + }, + ] + elif query == FivetranLogQuery.get_table_lineage_query("calendar_elected"): + return [ + { + "source_table_id": "10040", + "source_table_name": "employee", + "source_schema_name": "public", + "destination_table_id": "7779", + "destination_table_name": "employee", + "destination_schema_name": "postgres_public", + }, + { + "source_table_id": "10041", + "source_table_name": "company", + "source_schema_name": "public", + "destination_table_id": "7780", + "destination_table_name": "company", + "destination_schema_name": "postgres_public", + }, + ] + elif query == FivetranLogQuery.get_column_lineage_query( + "10040", "7779" + ) or query == FivetranLogQuery.get_column_lineage_query("10041", "7780"): + return [ + { + "source_column_name": "id", + "destination_column_name": "id", + }, + { + "source_column_name": "name", + "destination_column_name": "name", + }, + ] + elif query == FivetranLogQuery.get_user_query("reapply_phone"): + return [ + { + "user_id": "reapply_phone", + "given_name": "Shubham", + "family_name": "Jagtap", + } + ] + elif query == FivetranLogQuery.get_sync_start_logs_query("calendar_elected"): + return [ + { + "time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", + }, + { + "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), + "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", + }, + { + "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), + "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", + }, + ] + elif query == FivetranLogQuery.get_sync_end_logs_query("calendar_elected"): + return [ + { + "time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), + "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", + "message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', + }, + { + "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), + "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", + "message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', + }, + { + "time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), + "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", + "message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', + }, + ] + # Unreachable code + raise Exception(f"Unknown query {query}") + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_fivetran_basic(pytestconfig, tmp_path): + test_resources_dir = pytestconfig.rootpath / "tests/integration/fivetran" + + # Run the metadata ingestion pipeline. + output_file = tmp_path / "fivetran_test_events.json" + golden_file = test_resources_dir / "fivetran_golden.json" + + with mock.patch( + "datahub.ingestion.source.fivetran.fivetran_log_api.create_engine" + ) as mock_create_engine: + connection_magic_mock = MagicMock() + connection_magic_mock.execute.side_effect = default_query_results + + mock_create_engine.return_value = connection_magic_mock + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "fivetran", + "config": { + "fivetran_log_config": { + "destination_platform": "snowflake", + "destination_config": { + "account_id": "TESTID", + "warehouse": "TEST_WH", + "username": "test", + "password": "test@123", + "database": "TEST_DATABASE", + "role": "TESTROLE", + "log_schema": "TEST_SCHEMA", + }, + }, + "connector_patterns": { + "allow": [ + "postgres", + ] + }, + "sources_to_database": { + "calendar_elected": "postgres_db", + }, + "sources_to_platform_instance": { + "calendar_elected": { + "env": "DEV", + } + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{output_file}", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "fivetran_golden.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{output_file}", + golden_path=f"{test_resources_dir}/{golden_file}", + ) + + +@freeze_time(FROZEN_TIME) +def test_fivetran_snowflake_destination_config(pytestconfig, tmp_path): + snowflake_dest = DestinationConfig( + account_id="TESTID", + warehouse="TEST_WH", + username="test", + password="test@123", + database="TEST_DATABASE", + role="TESTROLE", + log_schema="TEST_SCHEMA", + ) + assert ( + snowflake_dest.get_sql_alchemy_url() + == "snowflake://test:test%40123@TESTID?application=acryl_datahub&authenticator=SNOWFLAKE&role=TESTROLE&warehouse=TEST_WH" + ) diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json index 3d956c5774dedb..3c70eda8561b86 100644 --- a/metadata-service/war/src/main/resources/boot/data_platforms.json +++ b/metadata-service/war/src/main/resources/boot/data_platforms.json @@ -564,5 +564,15 @@ "type": "KEY_VALUE_STORE", "logoUrl": "/assets/platforms/dynamodblogo.png" } + }, + { + "urn": "urn:li:dataPlatform:fivetran", + "aspect": { + "datasetNameDelimiter": ".", + "name": "fivetran", + "displayName": "Fivetran", + "type": "OTHERS", + "logoUrl": "/assets/platforms/fivetranlogo.png" + } } ]