Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration/fivetran): Fivetran connector integration #9018

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
abe52ae
Fivetran source connector initial code added
shubhamjagtap639 Oct 4, 2023
b04ff8c
Code to generate connector workunit added
shubhamjagtap639 Oct 4, 2023
3f392f4
Code to execute query through Sqlalchemy engine added
shubhamjagtap639 Oct 5, 2023
0c295e2
test cases for fivetran source added
shubhamjagtap639 Oct 10, 2023
b6897d1
fivetran source added in base dev requirement
shubhamjagtap639 Oct 10, 2023
7dddda3
Config description modified
shubhamjagtap639 Oct 10, 2023
81cf867
SnowflakeDestinationConfig class renamed to DestinationConfig
shubhamjagtap639 Oct 16, 2023
389c16f
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Oct 16, 2023
561d2b5
Test case golden file updated
shubhamjagtap639 Oct 16, 2023
d1a13f0
Code changes as per review comment
shubhamjagtap639 Oct 23, 2023
19234ee
Fivetran doc file added
shubhamjagtap639 Oct 25, 2023
2bba626
yml file modified
shubhamjagtap639 Oct 26, 2023
2e045d2
Merge branch 'master' into fivetran-connector-integration
shubhamjagtap639 Oct 26, 2023
f17aaf2
clarify example recipe
hsheth2 Oct 30, 2023
af11bcc
fix status aspects
hsheth2 Oct 30, 2023
5c2afd9
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Oct 31, 2023
1958c3e
Fivetran source integration changes
shubhamjagtap639 Oct 31, 2023
f1fdb04
Code changes as per review comments
shubhamjagtap639 Nov 1, 2023
c92d4fc
destination dataset urn corrected
shubhamjagtap639 Nov 1, 2023
42dd9ef
lint error fixed
shubhamjagtap639 Nov 1, 2023
5e0f3f5
source to database config added
shubhamjagtap639 Nov 2, 2023
b8883ff
always link to upstreams
hsheth2 Nov 3, 2023
425a74b
Merge branch 'master' into fivetran-connector-integration
hsheth2 Nov 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import databricksLogo from '../../../../images/databrickslogo.png';
import verticaLogo from '../../../../images/verticalogo.png';
import mlflowLogo from '../../../../images/mlflowlogo.png';
import dynamodbLogo from '../../../../images/dynamodblogo.png';
import fivetranLogo from '../../../../images/fivetranlogo.png';

export const ATHENA = 'athena';
export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`;
Expand Down Expand Up @@ -105,6 +106,8 @@ export const DBT_CLOUD = 'dbt-cloud';
export const DBT_CLOUD_URN = `urn:li:dataPlatform:dbt`;
export const VERTICA = 'vertica';
export const VERTICA_URN = `urn:li:dataPlatform:${VERTICA}`;
export const FIVETRAN = 'fivetran';
export const FIVETRAN_URN = `urn:li:dataPlatform:${FIVETRAN}`;

export const PLATFORM_URN_TO_LOGO = {
[ATHENA_URN]: athenaLogo,
Expand Down Expand Up @@ -138,6 +141,7 @@ export const PLATFORM_URN_TO_LOGO = {
[SUPERSET_URN]: supersetLogo,
[UNITY_CATALOG_URN]: databricksLogo,
[VERTICA_URN]: verticaLogo,
[FIVETRAN_URN]: fivetranLogo,
};

export const SOURCE_TO_PLATFORM_URN = {
Expand Down
7 changes: 7 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/vertica/",
"recipe": "source:\n type: vertica\n config:\n # Coordinates\n host_port: localhost:5433\n # The name of the vertica database\n database: Database_Name\n # Credentials\n username: Vertica_User\n password: Vertica_Password\n\n include_tables: true\n include_views: true\n include_projections: true\n include_models: true\n include_view_lineage: true\n include_projection_lineage: true\n profiling:\n enabled: false\n stateful_ingestion:\n enabled: true "
},
{
"urn": "urn:li:dataPlatform:fivetran",
"name": "fivetran",
"displayName": "Fivetran",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/fivetran/",
"recipe": "source:\n type: fivetran\n config:\n # Fivetran log connector destination server configurations\n fivetran_log_config:\n destination_platform: snowflake\n destination_config:\n # Coordinates\n account_id: snowflake_account_id\n warehouse: warehouse_name\n database: snowflake_db\n log_schema: fivetran_log_schema\n\n # Credentials\n username: ${SNOWFLAKE_USER}\n password: ${SNOWFLAKE_PASS}\n role: snowflake_role\n\n # Optional - filter for certain connector names instead of ingesting everything.\n # connector_patterns:\n # allow:\n # - connector_name\n\n # Optional -- This mapping is optional and only required to configure platform-instance for source\n # A mapping of Fivetran connector id to data platform instance\n # sources_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV\n\n # Optional -- This mapping is optional and only required to configure platform-instance for destination.\n # A mapping of Fivetran destination id to data platform instance\n # destination_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV"
},
{
"urn": "urn:li:dataPlatform:custom",
"name": "custom",
Expand Down
Binary file added datahub-web-react/src/images/fivetranlogo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
86 changes: 86 additions & 0 deletions metadata-ingestion/docs/sources/fivetran/fivetran_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
## Integration Details

This source extracts the following:

- Connectors in fivetran as Data Pipelines and Data Jobs to represent data lineage information between source and destination.
- Connector sources - DataJob input Datasets.
- Connector destination - DataJob output Datasets.
- Connector runs - DataProcessInstances as DataJob runs.

## Configuration Notes

1. Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination.
2. You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer [link](https://fivetran.com/docs/logs/fivetran-platform/setup-guide).
3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe.

## Concept mapping

| Fivetran | Datahub |
|--------------------------|--------------------------------------------------------------------------------------------------------|
| `Connector` | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) |
| `Source` | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) |
| `Destination` | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) |
| `Connector Run` | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |

Source and destination are mapped to Dataset as an Input and Output of Connector.

## Current limitations

Works only for Snowflake destination for now.

## Snowflake destination Configuration Guide
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicitly call out that this only works with snowflake for now

1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata.
2. Snowflake system admin can follow this guide to create a fivetran_datahub role, assign it the required privileges, and assign it to a user by executing the following Snowflake commands from a user with the ACCOUNTADMIN role or MANAGE GRANTS privilege.

```sql
create or replace role fivetran_datahub;

// Grant access to a warehouse to run queries to view metadata
grant operate, usage on warehouse "<your-warehouse>" to role fivetran_datahub;

// Grant access to view database and schema in which your log and metadata tables exist
grant usage on DATABASE "<fivetran-log-database>" to role fivetran_datahub;
grant usage on SCHEMA "<fivetran-log-database>"."<fivetran-log-schema>" to role fivetran_datahub;

// Grant access to execute select query on schema in which your log and metadata tables exist
grant select on all tables in SCHEMA "<fivetran-log-database>"."<fivetran-log-schema>" to role fivetran_datahub;

// Grant the fivetran_datahub to the snowflake user.
grant role fivetran_datahub to user snowflake_user;
```

## Advanced Configurations

### Working with Platform Instances
If you've multiple instances of source/destination systems that are referred in your `fivetran` setup, you'd need to configure platform instance for these systems in `fivetran` recipe to generate correct lineage edges. Refer the document [Working with Platform Instances](https://datahubproject.io/docs/platform-instances) to understand more about this.

While configuration of platform instance for source system you need to provide connector id as key and for destination system provide destination id as key.

#### Example - Multiple Postgres Source Connectors each reading from different postgres instance
```yml
# Map of connector source to platform instance
sources_to_platform_instance:
postgres_connector_id1:
platform_instance: cloud_postgres_instance
env: PROD

postgres_connector_id2:
platform_instance: local_postgres_instance
env: DEV
```

#### Example - Multiple Snowflake Destinations each writing to different snowflake instance
```yml
# Map of destination to platform instance
destination_to_platform_instance:
snowflake_destination_id1:
platform_instance: prod_snowflake_instance
env: PROD

snowflake_destination_id2:
platform_instance: dev_snowflake_instance
env: PROD
```



43 changes: 43 additions & 0 deletions metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
source:
type: fivetran
config:
# Fivetran log connector destination server configurations
fivetran_log_config:
destination_platform: snowflake
destination_config:
# Coordinates
account_id: "abc48144"
warehouse: "COMPUTE_WH"
database: "MY_SNOWFLAKE_DB"
log_schema: "FIVETRAN_LOG"

# Credentials
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASS}"
role: "snowflake_role"

# Optional - filter for certain connector names instead of ingesting everything.
# connector_patterns:
# allow:
# - connector_name

# Optional -- A mapping of the connector's all sources to its database.
# sources_to_database:
# connector_id: source_db

# Optional -- This mapping is optional and only required to configure platform-instance for source
# A mapping of Fivetran connector id to data platform instance
# sources_to_platform_instance:
# connector_id:
# platform_instance: cloud_instance
# env: DEV

# Optional -- This mapping is optional and only required to configure platform-instance for destination.
# A mapping of Fivetran destination id to data platform instance
# destination_to_platform_instance:
# destination_id:
# platform_instance: cloud_instance
# env: DEV

sink:
# sink configs
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"unity-catalog": databricks | sqllineage_lib,
"fivetran": snowflake_common,
}

# This is mainly used to exclude plugins from the Docker image.
Expand Down Expand Up @@ -525,6 +526,7 @@
"nifi",
"vertica",
"mode",
"fivetran",
"kafka-connect",
]
if plugin
Expand Down Expand Up @@ -629,6 +631,7 @@
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
"fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource",
],
"datahub.ingestion.transformer.plugins": [
"simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership",
Expand Down
25 changes: 15 additions & 10 deletions metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]:
)
return [tags]

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_mcp(
self, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInfoClass(
Expand All @@ -113,7 +115,9 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
)
yield mcp

yield from self.generate_data_input_output_mcp()
yield from self.generate_data_input_output_mcp(
materialize_iolets=materialize_iolets
)

for owner in self.generate_ownership_aspect():
mcp = MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -144,7 +148,9 @@ def emit(
for mcp in self.generate_mcp():
emitter.emit(mcp, callback)

def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_data_input_output_mcp(
self, materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataJobInputOutputClass(
Expand All @@ -157,10 +163,9 @@ def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapp
yield mcp

# Force entity materialization
for iolet in self.inlets + self.outlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)

yield mcp
if materialize_iolets:
for iolet in self.inlets + self.outlets:
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,10 @@ def emit_process_end(
self._emit_mcp(mcp, emitter, callback)

def generate_mcp(
self, created_ts_millis: Optional[int] = None
self, created_ts_millis: Optional[int] = None, materialize_iolets: bool = True
) -> Iterable[MetadataChangeProposalWrapper]:
"""
Generates mcps from the object
:rtype: Iterable[MetadataChangeProposalWrapper]
"""
"""Generates mcps from the object"""

mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataProcessInstanceProperties(
Expand Down Expand Up @@ -253,7 +251,7 @@ def generate_mcp(
)
yield mcp

yield from self.generate_inlet_outlet_mcp()
yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets)

@staticmethod
def _emit_mcp(
Expand Down Expand Up @@ -329,7 +327,9 @@ def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
dpi._template_object = dataflow
return dpi

def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
def generate_inlet_outlet_mcp(
self, materialize_iolets: bool
) -> Iterable[MetadataChangeProposalWrapper]:
if self.inlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
Expand All @@ -349,10 +349,9 @@ def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
yield mcp

# Force entity materialization
for iolet in self.inlets + self.outlets:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)

yield mcp
if materialize_iolets:
for iolet in self.inlets + self.outlets:
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
)
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def from_obj_require_wrapper(
return mcp

def as_workunit(
self, *, treat_errors_as_warnings: bool = False
self, *, treat_errors_as_warnings: bool = False, is_primary_source: bool = True
) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

Expand All @@ -254,10 +254,12 @@ def as_workunit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)

return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)
13 changes: 11 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
Expand Down Expand Up @@ -64,9 +65,9 @@ def auto_status_aspect(
"""
For all entities that don't have a status aspect, add one with removed set to false.
"""

all_urns: Set[str] = set()
status_urns: Set[str] = set()
skip_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
Expand All @@ -89,9 +90,17 @@ def auto_status_aspect(
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

if not isinstance(
wu.metadata, MetadataChangeEventClass
) and not entity_supports_aspect(wu.metadata.entityType, StatusClass):
# If any entity does not support aspect 'status' then skip that entity from adding status aspect.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a map of exactly what entity types support what aspects - can we look this information up there instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide some pointers on how to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls provide some pointers on this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the helper method from here #9120

# Example like dataProcessInstance doesn't suppport status aspect.
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance
skip_urns.add(urn)

yield wu

for urn in sorted(all_urns - status_urns):
for urn in sorted(all_urns - status_urns - skip_urns):
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
Empty file.
Loading
Loading