Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/spark): Bumping OpenLineage version to 0.14.0 #10559

Merged
merged 10 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.13.1'
ext.openLineageVersion = '1.14.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.FineGrainedLineageDownstreamType;
import com.linkedin.dataset.FineGrainedLineageUpstreamType;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -57,48 +55,6 @@ public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn
return this;
}

/**
* Adds a field as a fine grained upstream
*
* @param schemaFieldUrn a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the upstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageUpstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained upstream
*
Expand Down Expand Up @@ -135,54 +91,13 @@ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ downstreamSchemaField
+ encodeValueUrn(downstreamSchemaField)
+ "/"
+ finalQueryUrn
+ "/"
+ encodeValueUrn(upstreamSchemaField),
instance.numberNode(finalConfidenceScore)));

return this;
}

/**
* Adds a field as a fine grained downstream
*
* @param schemaFieldUrn a schema field to be marked as downstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the downstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedDownstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageDownstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}
fineGrainedLineageNode));

pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "downstreamType"
+ "/"
+ finalType
+ "/"
+ encodeValueUrn(schemaFieldUrn),
instance.numberNode(finalConfidenceScore)));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ private UrnUtils() {}
@Nonnull
public static DatasetUrn toDatasetUrn(
@Nonnull String platformName, @Nonnull String datasetName, @Nonnull String origin) {
return new DatasetUrn(new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase()));
return new DatasetUrn(
new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private final boolean captureColumnLevelLineage = true;
@Builder.Default private final DataJobUrn parentJobUrn = null;
// This is disabled until column level patch support won't be fixed in GMS
@Builder.Default private final boolean usePatch = false;
@Builder.Default private final boolean usePatch = true;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,45 @@ private static StringMap generateCustomProperties(
for (Map.Entry<String, OpenLineage.RunFacet> entry :
event.getRun().getFacets().getAdditionalProperties().entrySet()) {
switch (entry.getKey()) {
case "spark_jobDetails":
if (entry.getValue().getAdditionalProperties().get("jobId") != null) {
customProperties.put(
"jobId",
(String) entry.getValue().getAdditionalProperties().get("jobId").toString());
}
if (entry.getValue().getAdditionalProperties().get("jobDescription") != null) {
customProperties.put(
"jobDescription",
(String) entry.getValue().getAdditionalProperties().get("jobDescription"));
}
if (entry.getValue().getAdditionalProperties().get("jobGroup") != null) {
customProperties.put(
"jobGroup", (String) entry.getValue().getAdditionalProperties().get("jobGroup"));
}
if (entry.getValue().getAdditionalProperties().get("jobCallSite") != null) {
customProperties.put(
"jobCallSite",
(String) entry.getValue().getAdditionalProperties().get("jobCallSite"));
}
case "processing_engine":
if (entry.getValue().getAdditionalProperties().get("processing-engine") != null) {
customProperties.put(
"processing-engine",
(String) entry.getValue().getAdditionalProperties().get("name"));
}
if (entry.getValue().getAdditionalProperties().get("processing-engine-version") != null) {
customProperties.put(
"processing-engine-version",
(String) entry.getValue().getAdditionalProperties().get("version"));
}
if (entry.getValue().getAdditionalProperties().get("openlineage-adapter-version")
!= null) {
customProperties.put(
"openlineage-adapter-version",
(String)
entry.getValue().getAdditionalProperties().get("openlineageAdapterVersion"));
}

case "spark_version":
{
if (entry.getValue().getAdditionalProperties().get("spark-version") != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,16 @@ private Pair<UrnArray, EdgeArray> processDownstreams(
for (FineGrainedLineage fineGrainedLineage :
Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) {
for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
upstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
fineGrainedLineage.getUpstreamType());
}
for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedDownstreamField(
downstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
fineGrainedLineage.getDownstreamType());
for (Urn downstream :
Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
downstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
upstream,
null);
}
}
}
MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build();
Expand Down
2 changes: 1 addition & 1 deletion metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ private static SparkAppContext getSparkAppContext(

public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();

log.info("Application start called");
this.appContext = getSparkAppContext(applicationStart);

initializeContextFactoryIfNotInitialized();
listener.onApplicationStart(applicationStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("onApplicationStart completed successfully in {} ms", elapsedTime);
Expand Down Expand Up @@ -155,7 +154,8 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
return Optional.empty();
}

private synchronized void loadDatahubConfig(SparkAppContext appContext, Properties properties) {
private synchronized SparkLineageConf loadDatahubConfig(
SparkAppContext appContext, Properties properties) {
long startTime = System.currentTimeMillis();
datahubConf = parseSparkConfig();
SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
Expand All @@ -169,14 +169,15 @@ private synchronized void loadDatahubConfig(SparkAppContext appContext, Properti
Optional<Map<String, String>> databricksTags = getDatabricksTags(datahubConf);
this.appContext.setDatabricksTags(databricksTags.orElse(null));
}

log.info("Datahub configuration: {}", datahubConf.root().render());
Optional<DatahubEmitterConfig> restEmitter = initializeEmitter(datahubConf);
SparkLineageConf sparkLineageConf =
SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null));

emitter.setConfig(sparkLineageConf);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime);
return sparkLineageConf;
}

public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
Expand Down Expand Up @@ -220,7 +221,6 @@ public void onJobStart(SparkListenerJobStart jobStart) {
initializeContextFactoryIfNotInitialized();

log.debug("Job start called");
loadDatahubConfig(this.appContext, jobStart.properties());
listener.onJobStart(jobStart);
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onJobStart completed successfully in {} ms", elapsedTime);
Expand Down Expand Up @@ -333,10 +333,12 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin
return;
}
try {
SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null);
SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
// Needs to be done before initializing OpenLineageClient
initializeMetrics(config);
emitter = new DatahubEventEmitter(config, appName);
emitter.setConfig(datahubConfig);
contextFactory = new ContextFactory(emitter, meterRegistry, config);
circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
OpenLineageSparkListener.init(contextFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) {

public static boolean isPatchEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(PATCH_ENABLED)) {
return false;
return true;
}
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

Expand Down
Loading
Loading