Skip to content

Commit

Permalink
feat(field-level-lineage): Add models for field level lineage (#1936)
Browse files Browse the repository at this point in the history
* feat(field-level-lineage): adding models for field level lineage

adding models for field level lineage. Introduce DatasetFieldUrn as a unique identifier for dataset field
  • Loading branch information
nagarjunakanamarlapudi authored Nov 9, 2020
1 parent 89c7855 commit 7d574d1
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 4 deletions.
108 changes: 105 additions & 3 deletions gms/api/src/main/snapshot/com.linkedin.dataset.datasets.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,40 @@
"doc" : "The entity (e.g. a service URN) which performs the change on behalf of the Actor and must be authorized to act as the Actor.",
"optional" : true
} ]
}, {
"type" : "record",
"name" : "BaseFieldMapping",
"namespace" : "com.linkedin.common",
"doc" : "Base model representing field mappings",
"fields" : [ {
"name" : "created",
"type" : "AuditStamp",
"doc" : "Audit stamp containing who reported the field mapping and when"
}, {
"name" : "transformation",
"type" : [ {
"type" : "enum",
"name" : "TransformationType",
"namespace" : "com.linkedin.common.fieldtransformer",
"doc" : "Type of the transformation involved in generating destination fields from source fields.",
"symbols" : [ "BLACKBOX", "IDENTITY" ],
"symbolDocs" : {
"BLACKBOX" : "Field transformation expressed as unknown black box function.",
"IDENTITY" : "Field transformation expressed as Identity function."
}
}, {
"type" : "record",
"name" : "UDFTransformer",
"namespace" : "com.linkedin.common.fieldtransformer",
"doc" : "Field transformation expressed in UDF",
"fields" : [ {
"name" : "udf",
"type" : "string",
"doc" : "A UDF mentioning how the source fields got transformed to destination field. This is the FQCN(Fully Qualified Class Name) of the udf."
} ]
} ],
"doc" : "Transfomration function between the fields involved"
} ]
}, {
"type" : "record",
"name" : "ChangeAuditStamps",
Expand Down Expand Up @@ -77,6 +111,38 @@
"owningTeam" : "urn:li:internalTeam:wherehows"
}
}
}, {
"type" : "typeref",
"name" : "DatasetFieldUrn",
"namespace" : "com.linkedin.common",
"doc" : "Standardized dataset field information identifier.",
"ref" : "string",
"java" : {
"class" : "com.linkedin.common.urn.DatasetFieldUrn"
},
"validate" : {
"com.linkedin.common.validator.TypedUrnValidator" : {
"accessible" : true,
"constructable" : true,
"doc" : "Standardized dataset field information identifier",
"entityType" : "datasetField",
"fields" : [ {
"doc" : "Dataset that this dataset field belongs to.",
"name" : "dataset",
"type" : "com.linkedin.common.urn.DatasetUrn"
}, {
"doc" : "Dataset field path",
"maxLength" : 500,
"name" : "fieldPath",
"type" : "string"
} ],
"maxLength" : 807,
"name" : "DatasetField",
"namespace" : "li",
"owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ],
"owningTeam" : "urn:li:internalTeam:datahub"
}
}
}, {
"type" : "typeref",
"name" : "DatasetUrn",
Expand Down Expand Up @@ -269,7 +335,7 @@
"type" : "string",
"optional" : true
} ]
}, {
}, "com.linkedin.common.fieldtransformer.TransformationType", "com.linkedin.common.fieldtransformer.UDFTransformer", {
"type" : "record",
"name" : "Dataset",
"namespace" : "com.linkedin.dataset",
Expand Down Expand Up @@ -787,7 +853,30 @@
"doc" : "Upstream lineage metadata of the dataset",
"optional" : true
} ]
}, "com.linkedin.dataset.DatasetDeprecation", "com.linkedin.dataset.DatasetKey", "com.linkedin.dataset.DatasetLineageType", {
}, "com.linkedin.dataset.DatasetDeprecation", {
"type" : "record",
"name" : "DatasetFieldMapping",
"namespace" : "com.linkedin.dataset",
"doc" : "Representation of mapping between fields in source dataset to the field in destination dataset",
"include" : [ "com.linkedin.common.BaseFieldMapping" ],
"fields" : [ {
"name" : "sourceFields",
"type" : {
"type" : "array",
"items" : {
"type" : "typeref",
"name" : "DatasetFieldUpstream",
"doc" : "Upstreams of a dataset field.",
"ref" : [ "com.linkedin.common.DatasetFieldUrn" ]
}
},
"doc" : "Source fields from which the fine grained lineage is derived"
}, {
"name" : "destinationField",
"type" : "com.linkedin.common.DatasetFieldUrn",
"doc" : "Destination field which is derived from source fields"
} ]
}, "com.linkedin.dataset.DatasetFieldUpstream", "com.linkedin.dataset.DatasetKey", "com.linkedin.dataset.DatasetLineageType", {
"type" : "record",
"name" : "DatasetProperties",
"namespace" : "com.linkedin.dataset",
Expand Down Expand Up @@ -819,6 +908,19 @@
"doc" : "A key-value map to capture any other non-standardized properties for the dataset",
"default" : { }
} ]
}, {
"type" : "record",
"name" : "DatasetUpstreamLineage",
"namespace" : "com.linkedin.dataset",
"doc" : "Fine Grained upstream lineage for fields in a dataset",
"fields" : [ {
"name" : "fieldMappings",
"type" : {
"type" : "array",
"items" : "DatasetFieldMapping"
},
"doc" : "Upstream to downstream field level lineage mappings"
} ]
}, {
"type" : "record",
"name" : "Downstream",
Expand Down Expand Up @@ -868,7 +970,7 @@
"name" : "DatasetAspect",
"namespace" : "com.linkedin.metadata.aspect",
"doc" : "A union of all supported metadata aspects for a Dataset",
"ref" : [ "com.linkedin.dataset.DatasetProperties", "com.linkedin.dataset.DatasetDeprecation", "com.linkedin.dataset.UpstreamLineage", "com.linkedin.common.InstitutionalMemory", "com.linkedin.common.Ownership", "com.linkedin.common.Status", "com.linkedin.schema.SchemaMetadata" ]
"ref" : [ "com.linkedin.dataset.DatasetProperties", "com.linkedin.dataset.DatasetDeprecation", "com.linkedin.dataset.DatasetUpstreamLineage", "com.linkedin.dataset.UpstreamLineage", "com.linkedin.common.InstitutionalMemory", "com.linkedin.common.Ownership", "com.linkedin.common.Status", "com.linkedin.schema.SchemaMetadata" ]
}, {
"type" : "record",
"name" : "AggregationMetadata",
Expand Down
5 changes: 4 additions & 1 deletion li-utils/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ apply plugin: 'java'
apply plugin: 'pegasus'

dependencies {
dataModel externalDependency.gmaCoreModels
compile spec.product.pegasus.data

dataModel externalDependency.gmaCoreModels

testCompile externalDependency.assertJ
}

idea {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.linkedin.common.urn;

import com.linkedin.common.FabricType;
import com.linkedin.data.template.Custom;
import com.linkedin.data.template.DirectCoercer;
import com.linkedin.data.template.TemplateOutputCastException;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


/**
* Standardized dataset field information identifier
*/
public class DatasetFieldUrn extends Urn {

// uniquely identifies urn's key type
public static final String ENTITY_TYPE = "datasetField";

// urn pattern
private static final Pattern DATASET_FIELD_URN_PATTERN = Pattern.compile(
"urn:li:datasetField:\\(urn:li:dataset:\\(urn:li:dataPlatform:(?<dataPlatform>.+),(?<datasetName>.+),(?<fabric>.+)\\),(?<fieldPath>.+)\\)");

/**
* Dataset urn of the datasetFieldUrn
*/
private final DatasetUrn _dataset;

/**
* Field of datasetFieldUrn
*/
private final String _fieldPath;

static {
Custom.initializeCustomClass(DatasetUrn.class);
Custom.registerCoercer(new DirectCoercer<DatasetFieldUrn>() {

@Override
public String coerceInput(DatasetFieldUrn object) throws ClassCastException {
return object.toString();
}

@Override
public DatasetFieldUrn coerceOutput(Object object) throws TemplateOutputCastException {
if (object instanceof String) {
try {
return DatasetFieldUrn.deserialize(((String) object));
} catch (URISyntaxException e) {
throw new TemplateOutputCastException((("Deserializing output '" + object) + "' failed"), e);
}
}
throw new TemplateOutputCastException((("Output '" + object) + ("' is not a String, and cannot be coerced to "
+ DatasetFieldUrn.class.getName())));
}
}, DatasetFieldUrn.class);
}

/**
* Creates a new instance of a {@link DatasetFieldUrn }.
*
* @param dataset Dataset that this dataset field belongs to.
* @param fieldPath Dataset field path or column name
*/
public DatasetFieldUrn(DatasetUrn dataset, String fieldPath) {
this(dataset.getPlatformEntity().getPlatformNameEntity(), dataset.getDatasetNameEntity(), dataset.getOriginEntity(),
fieldPath);
}

public DatasetFieldUrn(String dataPlatform, String datasetName, FabricType fabricType, String fieldPath) {
super(ENTITY_TYPE, String.format("(urn:li:dataset:(urn:li:dataPlatform:%s,%s,%s),%s)", dataPlatform, datasetName,
fabricType.name(), fieldPath));
this._dataset = new DatasetUrn(new DataPlatformUrn(dataPlatform), datasetName, fabricType);
this._fieldPath = fieldPath;
}

public DatasetUrn getDatasetEntity() {
return _dataset;
}

public String getFieldPathEntity() {
return _fieldPath;
}

/**
* Creates an instance of a DatasetFieldUrn from a raw urn string.
* @param rawUrn The raw urn input to convert to a full DatasetFieldUrn instance.
* @return {@link DatasetFieldUrn} dataset Field Urn
*/
public static DatasetFieldUrn deserialize(String rawUrn) throws URISyntaxException {
final Matcher matcher = DATASET_FIELD_URN_PATTERN.matcher(rawUrn);
if (matcher.matches()) {
final String dataPlatform = matcher.group("dataPlatform");
final String datasetName = matcher.group("datasetName");
final String fabric = matcher.group("fabric");
final String fieldName = matcher.group("fieldPath");
return new DatasetFieldUrn(dataPlatform, datasetName, FabricType.valueOf(fabric), fieldName);
}
throw new URISyntaxException(rawUrn,
String.format("urn does match dataset field urn pattern %s", DATASET_FIELD_URN_PATTERN.toString()));
}
}
28 changes: 28 additions & 0 deletions li-utils/src/main/pegasus/com/linkedin/common/DatasetFieldUrn.pdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace com.linkedin.common

/**
* Standardized dataset field information identifier.
*/
@java.class = "com.linkedin.common.urn.DatasetFieldUrn"
@validate.`com.linkedin.common.validator.TypedUrnValidator` = {
"accessible" : true,
"owningTeam" : "urn:li:internalTeam:datahub",
"entityType" : "datasetField",
"constructable" : true,
"namespace" : "li",
"name" : "DatasetField",
"doc" : "Standardized dataset field information identifier",
"owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ],
"fields" : [ {
"type" : "com.linkedin.common.urn.DatasetUrn",
"name" : "dataset",
"doc" : "Dataset that this dataset field belongs to."
}, {
"name" : "fieldPath",
"doc" : "Dataset field path",
"type" : "string",
"maxLength" : 500
} ],
"maxLength" : 807
}
typeref DatasetFieldUrn = string
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.common.urn;

import com.linkedin.common.FabricType;
import java.net.URISyntaxException;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;


public class DatasetFieldUrnTest {

private static final String PLATFORM = "fooPlatform";
private static final String DATASET_NAME = "fooName";
private static final String FIELD_NAME = "fooField";
private static final FabricType FABRIC_TYPE = FabricType.PROD;

@Test
public void testSerialization() throws URISyntaxException {
final String datasetFieldString =
String.format("urn:li:datasetField:(urn:li:dataset:(urn:li:dataPlatform:%s,%s,%s),%s)", PLATFORM, DATASET_NAME,
FABRIC_TYPE, FIELD_NAME);

final DatasetFieldUrn datasetFieldUrn = DatasetFieldUrn.deserialize(datasetFieldString);
final DatasetUrn datasetUrn = datasetFieldUrn.getDatasetEntity();

Assertions.assertThat(datasetFieldUrn.getFieldPathEntity()).isEqualTo(FIELD_NAME);
Assertions.assertThat(datasetUrn.getDatasetNameEntity()).isEqualTo(DATASET_NAME);
Assertions.assertThat(datasetUrn.getPlatformEntity().getPlatformNameEntity()).isEqualTo(PLATFORM);
Assertions.assertThat(datasetUrn.getOriginEntity()).isEqualTo(FabricType.PROD);
Assertions.assertThat(datasetFieldUrn.toString())
.isEqualTo(datasetFieldString)
.describedAs("serialization followed by deserialization should produce the same urn string");
}

@Test
public void testCreateUrn() {
final DatasetFieldUrn datasetFieldUrn = new DatasetFieldUrn(PLATFORM, DATASET_NAME, FABRIC_TYPE, FIELD_NAME);

final DatasetUrn datasetUrn = datasetFieldUrn.getDatasetEntity();

Assertions.assertThat(datasetFieldUrn.getFieldPathEntity()).isEqualTo(FIELD_NAME);
Assertions.assertThat(datasetUrn.getDatasetNameEntity()).isEqualTo(DATASET_NAME);
Assertions.assertThat(datasetUrn.getPlatformEntity().getPlatformNameEntity()).isEqualTo(PLATFORM);
Assertions.assertThat(datasetUrn.getOriginEntity()).isEqualTo(FabricType.PROD);
}

@Test
public void testUrnConstructors() {
final DatasetFieldUrn datasetFieldUrn1 = new DatasetFieldUrn(PLATFORM, DATASET_NAME, FABRIC_TYPE, FIELD_NAME);
final DatasetUrn datasetUrn = datasetFieldUrn1.getDatasetEntity();
final DatasetFieldUrn datasetFieldUrn2 = new DatasetFieldUrn(datasetUrn, FIELD_NAME);

Assertions.assertThat(datasetFieldUrn1).isEqualTo(datasetFieldUrn2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace com.linkedin.common

import com.linkedin.common.fieldtransformer.TransformationType
import com.linkedin.common.fieldtransformer.UDFTransformer

/**
* Base model representing field mappings
*/
record BaseFieldMapping {
/**
* Audit stamp containing who reported the field mapping and when
*/
created: AuditStamp

/**
* Transfomration function between the fields involved
*/
transformation: union [TransformationType, UDFTransformer]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace com.linkedin.common.fieldtransformer

/**
* Type of the transformation involved in generating destination fields from source fields.
*/
enum TransformationType {
/**
* Field transformation expressed as unknown black box function.
*/
BLACKBOX,

/**
* Field transformation expressed as Identity function.
*/
IDENTITY
}
Loading

0 comments on commit 7d574d1

Please sign in to comment.