Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/spark): Bumping OpenLineage version to 0.14.0 #10559

Merged
merged 10 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.13.1'
ext.openLineageVersion = '1.14.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.FineGrainedLineageDownstreamType;
import com.linkedin.dataset.FineGrainedLineageUpstreamType;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -57,48 +55,6 @@ public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn
return this;
}

/**
* Adds a field as a fine grained upstream
*
* @param schemaFieldUrn a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the upstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageUpstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained upstream
*
Expand Down Expand Up @@ -135,54 +91,13 @@ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ downstreamSchemaField
+ encodeValueUrn(downstreamSchemaField)
+ "/"
+ finalQueryUrn
+ "/"
+ encodeValueUrn(upstreamSchemaField),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained downstream
*
* @param schemaFieldUrn a schema field to be marked as downstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the downstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedDownstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageDownstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}
fineGrainedLineageNode));

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "downstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ private UrnUtils() {}
@Nonnull
public static DatasetUrn toDatasetUrn(
@Nonnull String platformName, @Nonnull String datasetName, @Nonnull String origin) {
return new DatasetUrn(new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase()));
return new DatasetUrn(
new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private final boolean captureColumnLevelLineage = true;
@Builder.Default private final DataJobUrn parentJobUrn = null;
// This is disabled until column level patch support won't be fixed in GMS
@Builder.Default private final boolean usePatch = false;
@Builder.Default private final boolean usePatch = true;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,45 @@ private static StringMap generateCustomProperties(
for (Map.Entry<String, OpenLineage.RunFacet> entry :
event.getRun().getFacets().getAdditionalProperties().entrySet()) {
switch (entry.getKey()) {
case "spark_jobDetails":
if (entry.getValue().getAdditionalProperties().get("jobId") != null) {
customProperties.put(
"jobId",
(String) entry.getValue().getAdditionalProperties().get("jobId").toString());
}
if (entry.getValue().getAdditionalProperties().get("jobDescription") != null) {
customProperties.put(
"jobDescription",
(String) entry.getValue().getAdditionalProperties().get("jobDescription"));
}
if (entry.getValue().getAdditionalProperties().get("jobGroup") != null) {
customProperties.put(
"jobGroup", (String) entry.getValue().getAdditionalProperties().get("jobGroup"));
}
if (entry.getValue().getAdditionalProperties().get("jobCallSite") != null) {
customProperties.put(
"jobCallSite",
(String) entry.getValue().getAdditionalProperties().get("jobCallSite"));
}
case "processing_engine":
if (entry.getValue().getAdditionalProperties().get("processing-engine") != null) {
customProperties.put(
"processing-engine",
(String) entry.getValue().getAdditionalProperties().get("name"));
}
if (entry.getValue().getAdditionalProperties().get("processing-engine-version") != null) {
customProperties.put(
"processing-engine-version",
(String) entry.getValue().getAdditionalProperties().get("version"));
}
if (entry.getValue().getAdditionalProperties().get("openlineage-adapter-version")
!= null) {
customProperties.put(
"openlineage-adapter-version",
(String)
entry.getValue().getAdditionalProperties().get("openlineageAdapterVersion"));
}

case "spark_version":
{
if (entry.getValue().getAdditionalProperties().get("spark-version") != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void generateDataJobInputOutputMcp(
List<MetadataChangeProposal> mcps) {
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput();
log.info("Adding DataJob edges to {}", jobUrn);
if (config.isUsePatch()) {
if (config.isUsePatch() && (!parentJobs.isEmpty() || !inSet.isEmpty() || !outSet.isEmpty())) {
DataJobInputOutputPatchBuilder dataJobInputOutputPatchBuilder =
new DataJobInputOutputPatchBuilder().urn(jobUrn);
for (DatahubDataset dataset : inSet) {
Expand Down Expand Up @@ -263,39 +263,38 @@ private Pair<UrnArray, EdgeArray> processDownstreams(

if (dataset.getLineage() != null) {
if (config.isUsePatch()) {
UpstreamLineagePatchBuilder upstreamLineagePatchBuilder =
new UpstreamLineagePatchBuilder().urn(dataset.getUrn());
for (Upstream upstream : dataset.getLineage().getUpstreams()) {
upstreamLineagePatchBuilder.addUpstream(upstream.getDataset(), upstream.getType());
}

log.info("Adding FineGrainedLineage to {}", dataset.getUrn());
for (FineGrainedLineage fineGrainedLineage :
Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) {
for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
upstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
fineGrainedLineage.getUpstreamType());
if (!dataset.getLineage().getUpstreams().isEmpty()) {
UpstreamLineagePatchBuilder upstreamLineagePatchBuilder =
new UpstreamLineagePatchBuilder().urn(dataset.getUrn());
for (Upstream upstream : dataset.getLineage().getUpstreams()) {
upstreamLineagePatchBuilder.addUpstream(
upstream.getDataset(), upstream.getType());
}
for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedDownstreamField(
downstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
fineGrainedLineage.getDownstreamType());

log.info("Adding FineGrainedLineage to {}", dataset.getUrn());
for (FineGrainedLineage fineGrainedLineage :
Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) {
for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) {
for (Urn downstream :
Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
downstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
upstream,
null);
}
}
}
MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build();
log.info(
"upstreamLineagePatch: {}",
mcp.getAspect().getValue().asString(Charset.defaultCharset()));
mcps.add(mcp);
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build();
log.info(
"upstreamLineagePatch: {}",
mcp.getAspect().getValue().asString(Charset.defaultCharset()));
mcps.add(mcp);
} else {
addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps);
}
}
});
Expand Down Expand Up @@ -361,7 +360,7 @@ private void generateFlowGlobalTagsAspect(
DatahubOpenlineageConfig config,
List<MetadataChangeProposal> mcps) {
if (flowGlobalTags != null) {
if (config.isUsePatch()) {
if ((config.isUsePatch() && (!flowGlobalTags.getTags().isEmpty()))) {
GlobalTagsPatchBuilder globalTagsPatchBuilder = new GlobalTagsPatchBuilder().urn(flowUrn);
for (TagAssociation tag : flowGlobalTags.getTags()) {
globalTagsPatchBuilder.addTag(tag.getTag(), null);
Expand Down
14 changes: 7 additions & 7 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co

```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```

## spark-submit command line

```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.3 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.6 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```

### Configuration Instructions: Amazon EMR
Expand All @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)

```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
Expand All @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.3")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.6")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
Expand All @@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.

config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.3")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.6")
.

config("spark.extraListeners","datahub.spark.DatahubSparkListener")
Expand Down Expand Up @@ -181,8 +181,8 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
|

## What to Expect: The Metadata Model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ private static SparkAppContext getSparkAppContext(

public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();

log.info("Application start called");
this.appContext = getSparkAppContext(applicationStart);

initializeContextFactoryIfNotInitialized();
listener.onApplicationStart(applicationStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("onApplicationStart completed successfully in {} ms", elapsedTime);
Expand Down Expand Up @@ -155,7 +154,8 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
return Optional.empty();
}

private synchronized void loadDatahubConfig(SparkAppContext appContext, Properties properties) {
private synchronized SparkLineageConf loadDatahubConfig(
SparkAppContext appContext, Properties properties) {
long startTime = System.currentTimeMillis();
datahubConf = parseSparkConfig();
SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
Expand All @@ -169,14 +169,15 @@ private synchronized void loadDatahubConfig(SparkAppContext appContext, Properti
Optional<Map<String, String>> databricksTags = getDatabricksTags(datahubConf);
this.appContext.setDatabricksTags(databricksTags.orElse(null));
}

log.info("Datahub configuration: {}", datahubConf.root().render());
Optional<DatahubEmitterConfig> restEmitter = initializeEmitter(datahubConf);
SparkLineageConf sparkLineageConf =
SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null));

emitter.setConfig(sparkLineageConf);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime);
return sparkLineageConf;
}

public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
Expand Down Expand Up @@ -220,7 +221,6 @@ public void onJobStart(SparkListenerJobStart jobStart) {
initializeContextFactoryIfNotInitialized();

log.debug("Job start called");
loadDatahubConfig(this.appContext, jobStart.properties());
listener.onJobStart(jobStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onJobStart completed successfully in {} ms", elapsedTime);
Expand Down Expand Up @@ -333,10 +333,12 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin
return;
}
try {
SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null);
SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
// Needs to be done before initializing OpenLineageClient
initializeMetrics(config);
emitter = new DatahubEventEmitter(config, appName);
emitter.setConfig(datahubConfig);
contextFactory = new ContextFactory(emitter, meterRegistry, config);
circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
OpenLineageSparkListener.init(contextFactory);
Expand Down
Loading
Loading