Skip to content

Commit

Permalink
feat(ingest): ingest last-modified from dbt sources.json (#2729)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinhu authored Jun 23, 2021
1 parent e640f5e commit 22a2ed8
Show file tree
Hide file tree
Showing 8 changed files with 4,084 additions and 3,124 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ Pull metadata from dbt artifacts files:
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json)
- This file contains metadata for sources with freshness checks.
- We transfer dbt's freshness checks to DataHub's last-modified fields.
- Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified.
- target_platform:
- The data platform you are enriching with dbt metadata.
- [data platforms](https://github.com/linkedin/datahub/blob/master/gms/impl/src/main/resources/DataPlatformInfo.json)
Expand All @@ -694,6 +698,7 @@ source:
config:
manifest_path: "./path/dbt/manifest_file.json"
catalog_path: "./path/dbt/catalog_file.json"
sources_path: "./path/dbt/sources_file.json" # (optional, used for freshness checks)
target_platform: "postgres" # optional, eg "postgres", "snowflake", etc.
load_schemas: True or False
node_type_pattern: # optional
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/scripts/update_golden_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json
cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json
cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json
cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json
cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json

# Print success message.
set +x
Expand Down
76 changes: 53 additions & 23 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import logging
import time
from typing import Any, Dict, Iterable, List
from typing import Any, Dict, Iterable, List, Optional

import dateutil.parser

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -41,6 +43,7 @@
class DBTConfig(ConfigModel):
manifest_path: str
catalog_path: str
sources_path: Optional[str]
env: str = "PROD"
target_platform: str
load_schemas: bool
Expand Down Expand Up @@ -69,6 +72,7 @@ class DBTNode:
columns: List[DBTColumn]
upstream_urns: List[str]
datahub_urn: str
max_loaded_at: Optional[str]

def __repr__(self):
fields = tuple("{}={}".format(k, v) for k, v in self.__dict__.items())
Expand All @@ -93,13 +97,17 @@ def get_columns(catalog_node: dict) -> List[DBTColumn]:


def extract_dbt_entities(
nodes: Dict[str, dict],
catalog: Dict[str, dict],
nodes: Dict[str, Dict[str, Any]],
catalog: Dict[str, Dict[str, Any]],
sources_results: List[Dict[str, Any]],
load_catalog: bool,
target_platform: str,
environment: str,
node_type_pattern: AllowDenyPattern,
) -> List[DBTNode]:

sources_by_id = {x["unique_id"]: x for x in sources_results}

dbt_entities = []
for key, node in nodes.items():
dbtNode = DBTNode()
Expand All @@ -112,7 +120,8 @@ def extract_dbt_entities(
dbtNode.schema = node["schema"]
dbtNode.dbt_file_path = node["original_file_path"]
dbtNode.node_type = node["resource_type"]
if "identifier" in node and load_catalog is False:
dbtNode.max_loaded_at = sources_by_id.get(key, {}).get("max_loaded_at")
if "identifier" in node and not load_catalog:
dbtNode.name = node["identifier"]
else:
dbtNode.name = node["name"]
Expand Down Expand Up @@ -158,34 +167,46 @@ def extract_dbt_entities(
def loadManifestAndCatalog(
manifest_path: str,
catalog_path: str,
sources_path: Optional[str],
load_catalog: bool,
target_platform: str,
environment: str,
node_type_pattern: AllowDenyPattern,
) -> List[DBTNode]:
with open(manifest_path, "r") as manifest:
with open(catalog_path, "r") as catalog:
dbt_manifest_json = json.load(manifest)
dbt_catalog_json = json.load(catalog)
dbt_manifest_json = json.load(manifest)

manifest_nodes = dbt_manifest_json["nodes"]
manifest_sources = dbt_manifest_json["sources"]
with open(catalog_path, "r") as catalog:
dbt_catalog_json = json.load(catalog)

all_manifest_entities = {**manifest_nodes, **manifest_sources}
if sources_path is not None:
with open(sources_path, "r") as sources:
dbt_sources_json = json.load(sources)
sources_results = dbt_sources_json["results"]
else:
sources_results = {}

catalog_nodes = dbt_catalog_json["nodes"]
catalog_sources = dbt_catalog_json["sources"]
manifest_nodes = dbt_manifest_json["nodes"]
manifest_sources = dbt_manifest_json["sources"]

all_catalog_entities = {**catalog_nodes, **catalog_sources}
all_manifest_entities = {**manifest_nodes, **manifest_sources}

return extract_dbt_entities(
all_manifest_entities,
all_catalog_entities,
load_catalog,
target_platform,
environment,
node_type_pattern,
)
catalog_nodes = dbt_catalog_json["nodes"]
catalog_sources = dbt_catalog_json["sources"]

all_catalog_entities = {**catalog_nodes, **catalog_sources}

nodes = extract_dbt_entities(
all_manifest_entities,
all_catalog_entities,
sources_results,
load_catalog,
target_platform,
environment,
node_type_pattern,
)

return nodes


def get_urn_from_dbtNode(
Expand Down Expand Up @@ -308,15 +329,23 @@ def get_schema_metadata(

canonical_schema.append(field)

actor, sys_time = "urn:li:corpuser:dbt_executor", int(time.time()) * 1000
actor, sys_time = "urn:li:corpuser:dbt_executor", int(time.time() * 1000)

last_modified = sys_time

if node.max_loaded_at is not None:
last_modified = int(
dateutil.parser.parse(node.max_loaded_at).timestamp() * 1000
)

return SchemaMetadata(
schemaName=node.dbt_name,
platform=f"urn:li:dataPlatform:{platform}",
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
created=AuditStamp(time=sys_time, actor=actor),
lastModified=AuditStamp(time=sys_time, actor=actor),
lastModified=AuditStamp(time=last_modified, actor=actor),
fields=canonical_schema,
)

Expand All @@ -340,6 +369,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
nodes = loadManifestAndCatalog(
self.config.manifest_path,
self.config.catalog_path,
self.config.sources_path,
self.config.load_schemas,
self.config.target_platform,
self.config.env,
Expand Down
Loading

0 comments on commit 22a2ed8

Please sign in to comment.