Skip to content

Commit

Permalink
feat(docs) add fine-grained lineage section to dataset doc. Include e…
Browse files Browse the repository at this point in the history
…xamples in the lineage doc. Accordingly segregate ingestion examples of fine-grained lineage of dataset and datajob.
  • Loading branch information
ksrinath committed Feb 25, 2022
1 parent 3d6db93 commit 19b8fa4
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 45 deletions.
2 changes: 2 additions & 0 deletions docs/lineage/sample_code.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ The following samples will cover emitting dataset-to-dataset, dataset-to-job-to-
- [lineage_job_dataflow.py](../../metadata-ingestion/examples/library/lineage_job_dataflow.py) - emits the job-to-dataflow lineage via REST as MetadataChangeProposalWrapper.
- [lineage_emitter_rest.py](../../metadata-ingestion/examples/library/lineage_emitter_rest.py) - emits simple dataset-to-dataset lineage via REST as MetadataChangeEvent.
- [lineage_emitter_kafka.py](../../metadata-ingestion/examples/library/lineage_emitter_kafka.py) - emits simple dataset-to-dataset lineage via Kafka as MetadataChangeEvent.
- [lineage_emitter_dataset_finegrained.py](../../metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py) - emits fine-grained dataset-dataset lineage via REST as MetadataChangeProposalWrapper.
- [lineage_emitter_datajob_finegrained.py](../../metadata-ingestion/examples/library/lineage_emitter_datajob_finegrained.py) - emits fine-grained datajob-dataset lineage via REST as MetadataChangeProposalWrapper.
- [Datahub Snowflake Lineage](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py#L249) - emits Datahub's Snowflake lineage as MetadataChangeProposalWrapper.
- [Datahub Bigquery Lineage](https://github.com/linkedin/datahub/blob/a1bf95307b040074c8d65ebb86b5eb177fdcd591/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py#L229) - emits Datahub's Bigquery lineage as MetadataChangeProposalWrapper.
- [Datahub Dbt Lineage](https://github.com/linkedin/datahub/blob/a9754ebe83b6b73bc2bfbf49d9ebf5dbd2ca5a8f/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L625,L630) - emits Datahub's DBT lineage as MetadataChangeEvent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ def datasetUrn(tbl):

def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld);
#f"urn:li:schemaField:({datasetUrn(tbl)}, {fld})"

# Lineage of fields in a dataset (view)
# c1 <-- unknownFunc(bar2.c1, bar4.c1)
# c2 <-- myfunc(bar3.c2)
# {c3,c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# c5 <-- unknownFunc(bar3)
# {c6,c7} <-- unknownFunc(bar4)

# note that the semantic of the "transformOperation" value is contextual.
# Lineage of fields output by a job
# bar.c1 <-- unknownFunc(bar2.c1, bar4.c1)
# bar.c2 <-- myfunc(bar3.c2)
# {bar.c3,bar.c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# bar.c5 <-- unknownFunc(bar3)
# {bar.c6,bar.c7} <-- unknownFunc(bar4)
# bar2.c9 has no upstream i.e. its values are somehow created independently within this job.

# Note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.

fineGrainedLineages=[
Expand Down Expand Up @@ -61,46 +61,18 @@ def fldUrn(tbl, fld):
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")])
]

downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")]),

# this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
upstream = Upstream(dataset=datasetUrn("bar2"), type=DatasetLineageType.TRANSFORMED)

fieldLineages = UpstreamLineage(upstreams=[upstream], fineGrainedLineages=fineGrainedLineages)

viewLineageMcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=datasetUrn("bar"),
aspectName="upstreamLineage",
aspect=fieldLineages
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(viewLineageMcp)

# Lineage of fields output by a job
# The lineages are primarily the same as the above example.
#
# In addition to the earlier lineages,
# 1. the lineage of output col bar.c9 is unknown. So there is no lineage for it.
# 2. output col bar2.c9 is known to not have any parents i.e. its values are somehow created independently within this job.
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")])
]

# The lineage of output col bar.c9 is unknown. So there is no lineage for it above.
# Note that bar2 is an input as well as an output dataset, but some fields are inputs while other fields are outputs.

fineGrainedLineages.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.NONE,
upstreams=[],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar2", "c9")])
)

dataJobInputOutput = DataJobInputOutputClass(
inputDatasets=[datasetUrn("bar2"), datasetUrn("bar3"), datasetUrn("bar4")],
outputDatasets=[datasetUrn("bar"), datasetUrn("bar2")],
Expand All @@ -121,4 +93,9 @@ def fldUrn(tbl, fld):
aspectName="dataJobInputOutput",
aspect=dataJobInputOutput
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(dataJobLineageMcp)
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import datahub.emitter.mce_builder as builder
import json

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage
)
from datahub.metadata.schema_classes import ChangeTypeClass, DataJobInputOutputClass

def datasetUrn(tbl):
return builder.make_dataset_urn("postgres", tbl)

def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld);

# Lineage of fields in a dataset
# c1 <-- unknownFunc(bar2.c1, bar4.c1)
# c2 <-- myfunc(bar3.c2)
# {c3,c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# c5 <-- unknownFunc(bar3)
# {c6,c7} <-- unknownFunc(bar4)

# note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.

fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c1")]),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar3","c2")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c2")],
confidenceScore = 0.8, transformOperation="myfunc"),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("bar2","c2"), fldUrn("bar2","c3"), fldUrn("bar3","c1")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c3"), fldUrn("bar", "c4")],
confidenceScore = 0.7),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar3")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("bar", "c5")]),

FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.DATASET,
upstreams=[datasetUrn("bar4")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("bar", "c6"), fldUrn("bar", "c7")])
]


# this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
upstream = Upstream(dataset=datasetUrn("bar2"), type=DatasetLineageType.TRANSFORMED)

fieldLineages = UpstreamLineage(upstreams=[upstream], fineGrainedLineages=fineGrainedLineages)

lineageMcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=datasetUrn("bar"),
aspectName="upstreamLineage",
aspect=fieldLineages
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(lineageMcp)
Loading

0 comments on commit 19b8fa4

Please sign in to comment.