Skip to content

Commit

Permalink
fix(ingest/spark): Bumping OpenLineage version to 0.14.0 (#10559)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored May 29, 2024
1 parent 37bc423 commit d78287c
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 328 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.13.1'
ext.openLineageVersion = '1.14.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.FineGrainedLineageDownstreamType;
import com.linkedin.dataset.FineGrainedLineageUpstreamType;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -57,48 +55,6 @@ public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn
return this;
}

/**
* Adds a field as a fine grained upstream
*
* @param schemaFieldUrn a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the upstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageUpstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained upstream
*
Expand Down Expand Up @@ -135,54 +91,13 @@ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ downstreamSchemaField
+ encodeValueUrn(downstreamSchemaField)
+ "/"
+ finalQueryUrn
+ "/"
+ encodeValueUrn(upstreamSchemaField),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained downstream
*
* @param schemaFieldUrn a schema field to be marked as downstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the downstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedDownstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageDownstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}
fineGrainedLineageNode));

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "downstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private final boolean captureColumnLevelLineage = true;
@Builder.Default private final DataJobUrn parentJobUrn = null;
// This is disabled until column level patch support won't be fixed in GMS
@Builder.Default private final boolean usePatch = false;
@Builder.Default private final boolean usePatch = true;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,45 @@ private static StringMap generateCustomProperties(
for (Map.Entry<String, OpenLineage.RunFacet> entry :
event.getRun().getFacets().getAdditionalProperties().entrySet()) {
switch (entry.getKey()) {
case "spark_jobDetails":
if (entry.getValue().getAdditionalProperties().get("jobId") != null) {
customProperties.put(
"jobId",
(String) entry.getValue().getAdditionalProperties().get("jobId").toString());
}
if (entry.getValue().getAdditionalProperties().get("jobDescription") != null) {
customProperties.put(
"jobDescription",
(String) entry.getValue().getAdditionalProperties().get("jobDescription"));
}
if (entry.getValue().getAdditionalProperties().get("jobGroup") != null) {
customProperties.put(
"jobGroup", (String) entry.getValue().getAdditionalProperties().get("jobGroup"));
}
if (entry.getValue().getAdditionalProperties().get("jobCallSite") != null) {
customProperties.put(
"jobCallSite",
(String) entry.getValue().getAdditionalProperties().get("jobCallSite"));
}
case "processing_engine":
if (entry.getValue().getAdditionalProperties().get("processing-engine") != null) {
customProperties.put(
"processing-engine",
(String) entry.getValue().getAdditionalProperties().get("name"));
}
if (entry.getValue().getAdditionalProperties().get("processing-engine-version") != null) {
customProperties.put(
"processing-engine-version",
(String) entry.getValue().getAdditionalProperties().get("version"));
}
if (entry.getValue().getAdditionalProperties().get("openlineage-adapter-version")
!= null) {
customProperties.put(
"openlineage-adapter-version",
(String)
entry.getValue().getAdditionalProperties().get("openlineageAdapterVersion"));
}

case "spark_version":
{
if (entry.getValue().getAdditionalProperties().get("spark-version") != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void generateDataJobInputOutputMcp(
List<MetadataChangeProposal> mcps) {
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput();
log.info("Adding DataJob edges to {}", jobUrn);
if (config.isUsePatch()) {
if (config.isUsePatch() && (!parentJobs.isEmpty() || !inSet.isEmpty() || !outSet.isEmpty())) {
DataJobInputOutputPatchBuilder dataJobInputOutputPatchBuilder =
new DataJobInputOutputPatchBuilder().urn(jobUrn);
for (DatahubDataset dataset : inSet) {
Expand Down Expand Up @@ -263,39 +263,38 @@ private Pair<UrnArray, EdgeArray> processDownstreams(

if (dataset.getLineage() != null) {
if (config.isUsePatch()) {
UpstreamLineagePatchBuilder upstreamLineagePatchBuilder =
new UpstreamLineagePatchBuilder().urn(dataset.getUrn());
for (Upstream upstream : dataset.getLineage().getUpstreams()) {
upstreamLineagePatchBuilder.addUpstream(upstream.getDataset(), upstream.getType());
}

log.info("Adding FineGrainedLineage to {}", dataset.getUrn());
for (FineGrainedLineage fineGrainedLineage :
Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) {
for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
upstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
fineGrainedLineage.getUpstreamType());
if (!dataset.getLineage().getUpstreams().isEmpty()) {
UpstreamLineagePatchBuilder upstreamLineagePatchBuilder =
new UpstreamLineagePatchBuilder().urn(dataset.getUrn());
for (Upstream upstream : dataset.getLineage().getUpstreams()) {
upstreamLineagePatchBuilder.addUpstream(
upstream.getDataset(), upstream.getType());
}
for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedDownstreamField(
downstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
fineGrainedLineage.getDownstreamType());

log.info("Adding FineGrainedLineage to {}", dataset.getUrn());
for (FineGrainedLineage fineGrainedLineage :
Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) {
for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) {
for (Urn downstream :
Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
downstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
upstream,
null);
}
}
}
MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build();
log.info(
"upstreamLineagePatch: {}",
mcp.getAspect().getValue().asString(Charset.defaultCharset()));
mcps.add(mcp);
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build();
log.info(
"upstreamLineagePatch: {}",
mcp.getAspect().getValue().asString(Charset.defaultCharset()));
mcps.add(mcp);
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
}
});
Expand Down Expand Up @@ -361,7 +360,7 @@ private void generateFlowGlobalTagsAspect(
DatahubOpenlineageConfig config,
List<MetadataChangeProposal> mcps) {
if (flowGlobalTags != null) {
if (config.isUsePatch()) {
if ((config.isUsePatch() && (!flowGlobalTags.getTags().isEmpty()))) {
GlobalTagsPatchBuilder globalTagsPatchBuilder = new GlobalTagsPatchBuilder().urn(flowUrn);
for (TagAssociation tag : flowGlobalTags.getTags()) {
globalTagsPatchBuilder.addTag(tag.getTag(), null);
Expand Down
14 changes: 7 additions & 7 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co

```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```

## spark-submit command line

```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.3 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.6 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```

### Configuration Instructions: Amazon EMR
Expand All @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)

```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
Expand All @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.3")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.6")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
Expand All @@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.

config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.3")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.6")
.

config("spark.extraListeners","datahub.spark.DatahubSparkListener")
Expand Down Expand Up @@ -181,8 +181,8 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
|

## What to Expect: The Metadata Model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ private static SparkAppContext getSparkAppContext(

public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();

log.info("Application start called");
this.appContext = getSparkAppContext(applicationStart);

initializeContextFactoryIfNotInitialized();
listener.onApplicationStart(applicationStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("onApplicationStart completed successfully in {} ms", elapsedTime);
Expand Down Expand Up @@ -155,7 +154,8 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
return Optional.empty();
}

private synchronized void loadDatahubConfig(SparkAppContext appContext, Properties properties) {
private synchronized SparkLineageConf loadDatahubConfig(
SparkAppContext appContext, Properties properties) {
long startTime = System.currentTimeMillis();
datahubConf = parseSparkConfig();
SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
Expand All @@ -169,14 +169,15 @@ private synchronized void loadDatahubConfig(SparkAppContext appContext, Properti
Optional<Map<String, String>> databricksTags = getDatabricksTags(datahubConf);
this.appContext.setDatabricksTags(databricksTags.orElse(null));
}

log.info("Datahub configuration: {}", datahubConf.root().render());
Optional<DatahubEmitterConfig> restEmitter = initializeEmitter(datahubConf);
SparkLineageConf sparkLineageConf =
SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null));

emitter.setConfig(sparkLineageConf);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime);
return sparkLineageConf;
}

public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
Expand Down Expand Up @@ -220,7 +221,6 @@ public void onJobStart(SparkListenerJobStart jobStart) {
initializeContextFactoryIfNotInitialized();

log.debug("Job start called");
loadDatahubConfig(this.appContext, jobStart.properties());
listener.onJobStart(jobStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onJobStart completed successfully in {} ms", elapsedTime);
Expand Down Expand Up @@ -333,10 +333,12 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin
return;
}
try {
SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null);
SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
// Needs to be done before initializing OpenLineageClient
initializeMetrics(config);
emitter = new DatahubEventEmitter(config, appName);
emitter.setConfig(datahubConfig);
contextFactory = new ContextFactory(emitter, meterRegistry, config);
circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
OpenLineageSparkListener.init(contextFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) {

public static boolean isPatchEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(PATCH_ENABLED)) {
return false;
return true;
}
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}
Expand Down
Loading

0 comments on commit d78287c

Please sign in to comment.