From 89cf828f57dde941e91079bc40f36026cbcef431 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 21 May 2024 14:32:39 +0200 Subject: [PATCH 1/9] - Bumping OpenLineage version to 0.14.0 - Adding some bugfixes which caused spark lineage fail due to non existing emitter object --- build.gradle | 2 +- .../converter/OpenLineageToDataHub.java | 39 ++++ .../datahub/spark/DatahubSparkListener.java | 12 +- .../agent/util/RemovePathPatternUtils.java | 2 +- .../plan/catalog/IcebergHandler.java | 192 ------------------ 5 files changed, 48 insertions(+), 199 deletions(-) delete mode 100644 metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java diff --git a/build.gradle b/build.gradle index 5264c1c58313c7..437ac4fbf6f32d 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '12.16.1' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.13.1' + ext.openLineageVersion = '1.14.0' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 59cac8719c303a..1db09306cbdc22 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -423,6 +423,45 @@ private static StringMap generateCustomProperties( for (Map.Entry entry : event.getRun().getFacets().getAdditionalProperties().entrySet()) { switch (entry.getKey()) { + case "spark_jobDetails": + if (entry.getValue().getAdditionalProperties().get("jobId") != null) { + customProperties.put( + "jobId", + (String) entry.getValue().getAdditionalProperties().get("jobId").toString()); + } + if (entry.getValue().getAdditionalProperties().get("jobDescription") != null) { + customProperties.put( + "jobDescription", + (String) entry.getValue().getAdditionalProperties().get("jobDescription")); + } + if (entry.getValue().getAdditionalProperties().get("jobGroup") != null) { + customProperties.put( + "jobGroup", (String) entry.getValue().getAdditionalProperties().get("jobGroup")); + } + if (entry.getValue().getAdditionalProperties().get("jobCallSite") != null) { + customProperties.put( + "jobCallSite", + (String) entry.getValue().getAdditionalProperties().get("jobCallSite")); + } + case "processing_engine": + if (entry.getValue().getAdditionalProperties().get("processing-engine") != null) { + customProperties.put( + "processing-engine", + (String) entry.getValue().getAdditionalProperties().get("name")); + } + if (entry.getValue().getAdditionalProperties().get("processing-engine-version") != null) { + customProperties.put( + "processing-engine-version", + (String) entry.getValue().getAdditionalProperties().get("version")); + } + if (entry.getValue().getAdditionalProperties().get("openlineage-adapter-version") + != null) { + customProperties.put( + "openlineage-adapter-version", + (String) + entry.getValue().getAdditionalProperties().get("openlineageAdapterVersion")); + } + case "spark_version": { if (entry.getValue().getAdditionalProperties().get("spark-version") != null) { diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java index 38de142c4dd171..54bb3821eddedf 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java @@ -87,11 +87,10 @@ private static SparkAppContext getSparkAppContext( public void onApplicationStart(SparkListenerApplicationStart applicationStart) { long startTime = System.currentTimeMillis(); - initializeContextFactoryIfNotInitialized(); log.info("Application start called"); this.appContext = getSparkAppContext(applicationStart); - + initializeContextFactoryIfNotInitialized(); listener.onApplicationStart(applicationStart); long elapsedTime = System.currentTimeMillis() - startTime; log.info("onApplicationStart completed successfully in {} ms", elapsedTime); @@ -155,7 +154,8 @@ public Optional initializeEmitter(Config sparkConf) { return Optional.empty(); } - private synchronized void loadDatahubConfig(SparkAppContext appContext, Properties properties) { + private synchronized SparkLineageConf loadDatahubConfig( + SparkAppContext appContext, Properties properties) { long startTime = System.currentTimeMillis(); datahubConf = parseSparkConfig(); SparkEnv sparkEnv = SparkEnv$.MODULE$.get(); @@ -169,14 +169,15 @@ private synchronized void loadDatahubConfig(SparkAppContext appContext, Properti Optional> databricksTags = getDatabricksTags(datahubConf); this.appContext.setDatabricksTags(databricksTags.orElse(null)); } + log.info("Datahub configuration: {}", datahubConf.root().render()); Optional restEmitter = initializeEmitter(datahubConf); SparkLineageConf sparkLineageConf = SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null)); - emitter.setConfig(sparkLineageConf); long elapsedTime = System.currentTimeMillis() - startTime; log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime); + return sparkLineageConf; } public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { @@ -220,7 +221,6 @@ public void onJobStart(SparkListenerJobStart jobStart) { initializeContextFactoryIfNotInitialized(); log.debug("Job start called"); - loadDatahubConfig(this.appContext, jobStart.properties()); listener.onJobStart(jobStart); long elapsedTime = System.currentTimeMillis() - startTime; log.debug("onJobStart completed successfully in {} ms", elapsedTime); @@ -333,10 +333,12 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin return; } try { + SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null); SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf); // Needs to be done before initializing OpenLineageClient initializeMetrics(config); emitter = new DatahubEventEmitter(config, appName); + emitter.setConfig(datahubConfig); contextFactory = new ContextFactory(emitter, meterRegistry, config); circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build(); OpenLineageSparkListener.init(contextFactory); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java index a606a44ddd5160..c44dacf8ff3bea 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java @@ -22,7 +22,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java deleted file mode 100644 index dcd1cf3fb3aff7..00000000000000 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java +++ /dev/null @@ -1,192 +0,0 @@ -/* -/* Copyright 2018-2023 contributors to the OpenLineage project -/* SPDX-License-Identifier: Apache-2.0 -*/ - -package io.openlineage.spark3.agent.lifecycle.plan.catalog; - -import io.openlineage.client.OpenLineage; -import io.openlineage.client.utils.DatasetIdentifier; -import io.openlineage.spark.agent.util.PathUtils; -import io.openlineage.spark.agent.util.ScalaConversionUtils; -import io.openlineage.spark.agent.util.SparkConfUtils; -import io.openlineage.spark.api.OpenLineageContext; -import java.io.File; -import java.net.URI; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import javax.annotation.Nullable; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.spark.SparkCatalog; -import org.apache.iceberg.spark.SparkSessionCatalog; -import org.apache.iceberg.spark.source.SparkTable; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; - -@Slf4j -public class IcebergHandler implements CatalogHandler { - - private final OpenLineageContext context; - - private static final String TYPE = "type"; - private static final String CATALOG_IMPL = "catalog-impl"; - private static final String IO_IMPL = "io-impl"; - - public IcebergHandler(OpenLineageContext context) { - this.context = context; - } - - @Override - public boolean hasClasses() { - try { - IcebergHandler.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog"); - return true; - } catch (Exception e) { - // swallow- we don't care - } - return false; - } - - @Override - public boolean isClass(TableCatalog tableCatalog) { - return (tableCatalog instanceof SparkCatalog) || (tableCatalog instanceof SparkSessionCatalog); - } - - @Override - public DatasetIdentifier getDatasetIdentifier( - SparkSession session, - TableCatalog tableCatalog, - Identifier identifier, - Map properties) { - String catalogName = tableCatalog.name(); - - String prefix = String.format("spark.sql.catalog.%s", catalogName); - Map conf = - ScalaConversionUtils.fromMap(session.conf().getAll()); - log.info(conf.toString()); - Map catalogConf = - conf.entrySet().stream() - .filter(x -> x.getKey().startsWith(prefix)) - .filter(x -> x.getKey().length() > prefix.length()) - .collect( - Collectors.toMap( - x -> x.getKey().substring(prefix.length() + 1), // handle dot after prefix - Map.Entry::getValue)); - - log.info(catalogConf.toString()); - if (catalogConf.isEmpty() - || (!catalogConf.containsKey(TYPE) - && !catalogConf.get(CATALOG_IMPL).equals("org.apache.iceberg.aws.glue.GlueCatalog"))) { - throw new UnsupportedCatalogException(catalogName); - } - log.info(catalogConf.get(TYPE)); - - String warehouse = catalogConf.get(CatalogProperties.WAREHOUSE_LOCATION); - DatasetIdentifier di; - - if (catalogConf.get(CATALOG_IMPL).equals("org.apache.iceberg.aws.glue.GlueCatalog")) { - di = new DatasetIdentifier(identifier.toString(), "glue"); - log.info("Glue catalog detected, returning glue dataset identifier {}", di); - return di; - } else { - di = PathUtils.fromPath(new Path(warehouse, identifier.toString())); - } - if (catalogConf.get(TYPE).equals("hive")) { - di.withSymlink( - getHiveIdentifier( - session, catalogConf.get(CatalogProperties.URI), identifier.toString())); - } else if (catalogConf.get(TYPE).equals("hadoop")) { - di.withSymlink( - identifier.toString(), - StringUtils.substringBeforeLast( - di.getName(), File.separator), // parent location from a name becomes a namespace - DatasetIdentifier.SymlinkType.TABLE); - } else if (catalogConf.get(TYPE).equals("rest")) { - di.withSymlink( - getRestIdentifier( - session, catalogConf.get(CatalogProperties.URI), identifier.toString())); - } else if (catalogConf.get(TYPE).equals("nessie")) { - di.withSymlink( - getNessieIdentifier( - session, catalogConf.get(CatalogProperties.URI), identifier.toString())); - } - - return di; - } - - @SneakyThrows - private DatasetIdentifier.Symlink getNessieIdentifier( - SparkSession session, @Nullable String confUri, String table) { - - String uri = new URI(confUri).toString(); - return new DatasetIdentifier.Symlink(table, uri, DatasetIdentifier.SymlinkType.TABLE); - } - - @SneakyThrows - private DatasetIdentifier.Symlink getHiveIdentifier( - SparkSession session, @Nullable String confUri, String table) { - String slashPrefixedTable = String.format("/%s", table); - URI uri; - if (confUri == null) { - uri = - SparkConfUtils.getMetastoreUri(session.sparkContext().conf()) - .orElseThrow(() -> new UnsupportedCatalogException("hive")); - } else { - uri = new URI(confUri); - } - DatasetIdentifier metastoreIdentifier = - PathUtils.fromPath( - new Path(PathUtils.enrichHiveMetastoreURIWithTableName(uri, slashPrefixedTable))); - - return new DatasetIdentifier.Symlink( - metastoreIdentifier.getName(), - metastoreIdentifier.getNamespace(), - DatasetIdentifier.SymlinkType.TABLE); - } - - @SneakyThrows - private DatasetIdentifier.Symlink getRestIdentifier( - SparkSession session, @Nullable String confUri, String table) { - - String uri = new URI(confUri).toString(); - return new DatasetIdentifier.Symlink(table, uri, DatasetIdentifier.SymlinkType.TABLE); - } - - @Override - public Optional getStorageDatasetFacet( - Map properties) { - String format = properties.getOrDefault("format", ""); - return Optional.of( - context.getOpenLineage().newStorageDatasetFacet("iceberg", format.replace("iceberg/", ""))); - } - - @SneakyThrows - @Override - public Optional getDatasetVersion( - TableCatalog tableCatalog, Identifier identifier, Map properties) { - SparkTable table; - try { - table = (SparkTable) tableCatalog.loadTable(identifier); - } catch (NoSuchTableException | ClassCastException e) { - log.error("Failed to load table from catalog: {}", identifier, e); - return Optional.empty(); - } - - if (table.table() != null && table.table().currentSnapshot() != null) { - return Optional.of(Long.toString(table.table().currentSnapshot().snapshotId())); - } - return Optional.empty(); - } - - @Override - public String getName() { - return "iceberg"; - } -} From 36951e4cca830e35bc1147890f0d31ec8e86dcde Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 21 May 2024 14:40:05 +0200 Subject: [PATCH 2/9] Adding note to legacy spark lineage plugin --- metadata-integration/java/spark-lineage/README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index f56cb14a1ae546..041408aac6d6d3 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -1,4 +1,10 @@ -# Spark +# Spark (Legacy) + +:::note + +This is our legacy Spark Integration which is replaced by [Acryl Spark Lineage](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta) + +::: To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios. From b8b1148330849cfde64d58c99addd9a6c0dd09fb Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 22 May 2024 08:40:46 +0200 Subject: [PATCH 3/9] Fixing cll patch support --- .../builder/UpstreamLineagePatchBuilder.java | 89 +------------------ .../config/DatahubOpenlineageConfig.java | 2 +- .../openlineage/dataset/DatahubJob.java | 24 +++-- .../datahub/spark/conf/SparkConfigParser.java | 2 +- 4 files changed, 14 insertions(+), 103 deletions(-) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java index 40b6de68f7b56a..08182761aeb03f 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java @@ -9,8 +9,6 @@ import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; import com.linkedin.dataset.DatasetLineageType; -import com.linkedin.dataset.FineGrainedLineageDownstreamType; -import com.linkedin.dataset.FineGrainedLineageUpstreamType; import com.linkedin.metadata.aspect.patch.PatchOperationType; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -57,48 +55,6 @@ public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn return this; } - /** - * Adds a field as a fine grained upstream - * - * @param schemaFieldUrn a schema field to be marked as upstream, format: - * urn:li:schemaField(DATASET_URN, COLUMN NAME) - * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for - * full confidence - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @param type the upstream lineage type, either Field or Field Set - * @return this builder - */ - public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( - @Nonnull Urn schemaFieldUrn, - @Nullable Float confidenceScore, - @Nonnull String transformationOperation, - @Nullable FineGrainedLineageUpstreamType type) { - Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); - } else { - finalType = type.toString(); - } - - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "upstreamType" - + "/" - + finalType - + "/" - + encodeValueUrn(schemaFieldUrn), - instance.numberNode(finalConfidenceScore))); - - return this; - } - /** * Adds a field as a fine grained upstream * @@ -135,54 +91,13 @@ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( FINE_GRAINED_PATH_START + transformationOperation + "/" - + downstreamSchemaField + + encodeValueUrn(downstreamSchemaField) + "/" + finalQueryUrn + "/" + encodeValueUrn(upstreamSchemaField), - instance.numberNode(finalConfidenceScore))); - - return this; - } - - /** - * Adds a field as a fine grained downstream - * - * @param schemaFieldUrn a schema field to be marked as downstream, format: - * urn:li:schemaField(DATASET_URN, COLUMN NAME) - * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for - * full confidence - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @param type the downstream lineage type, either Field or Field Set - * @return this builder - */ - public UpstreamLineagePatchBuilder addFineGrainedDownstreamField( - @Nonnull Urn schemaFieldUrn, - @Nullable Float confidenceScore, - @Nonnull String transformationOperation, - @Nullable FineGrainedLineageDownstreamType type) { - Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); - } else { - finalType = type.toString(); - } + fineGrainedLineageNode)); - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "downstreamType" - + "/" - + finalType - + "/" - + encodeValueUrn(schemaFieldUrn), - instance.numberNode(finalConfidenceScore))); return this; } diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java index b8d4d53511cd0b..5e4b791fa8d82d 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java @@ -27,7 +27,7 @@ public class DatahubOpenlineageConfig { @Builder.Default private final boolean captureColumnLevelLineage = true; @Builder.Default private final DataJobUrn parentJobUrn = null; // This is disabled until column level patch support won't be fixed in GMS - @Builder.Default private final boolean usePatch = false; + @Builder.Default private final boolean usePatch = true; public List getPathSpecsForPlatform(String platform) { if ((pathSpecs == null) || (pathSpecs.isEmpty())) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 3682a42bb3571c..0525bb00e9666f 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -273,20 +273,16 @@ private Pair processDownstreams( for (FineGrainedLineage fineGrainedLineage : Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { - upstreamLineagePatchBuilder.addFineGrainedUpstreamField( - upstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - fineGrainedLineage.getUpstreamType()); - } - for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { - upstreamLineagePatchBuilder.addFineGrainedDownstreamField( - downstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - fineGrainedLineage.getDownstreamType()); + for (Urn downstream : + Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { + upstreamLineagePatchBuilder.addFineGrainedUpstreamField( + downstream, + fineGrainedLineage.getConfidenceScore(), + StringUtils.defaultIfEmpty( + fineGrainedLineage.getTransformOperation(), "TRANSFORM"), + upstream, + null); + } } } MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java index f1af56ff888d3c..d8da5d95935c9a 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -307,7 +307,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) { public static boolean isPatchEnabled(Config datahubConfig) { if (!datahubConfig.hasPath(PATCH_ENABLED)) { - return false; + return true; } return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED); } From 885e9dac57a5864af1f790298d31d5ffd7b76bed Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 22 May 2024 08:47:40 +0200 Subject: [PATCH 4/9] Fixing typo --- metadata-integration/java/spark-lineage-beta/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/spark-lineage-beta/README.md index e09bc3938b6868..a521f1770592b5 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/spark-lineage-beta/README.md @@ -181,7 +181,7 @@ information like tokens. | spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | | spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | | spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . | +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . | | spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. | From 02d3cf44b662f6895d7a3aa61932059a263ad107 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 22 May 2024 08:59:54 +0200 Subject: [PATCH 5/9] Fixing spotless check issue --- .../src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java index 3d97e0d5c99189..89f0cd8fbc9791 100644 --- a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java +++ b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java @@ -23,7 +23,8 @@ private UrnUtils() {} @Nonnull public static DatasetUrn toDatasetUrn( @Nonnull String platformName, @Nonnull String datasetName, @Nonnull String origin) { - return new DatasetUrn(new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase())); + return new DatasetUrn( + new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase())); } /** From 327d038aa2ae91a5eebd4f971d3ad770af0089d4 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 22 May 2024 20:59:20 +0200 Subject: [PATCH 6/9] Fixing empty lineage patch issue --- .../openlineage/dataset/DatahubJob.java | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 0525bb00e9666f..22fca804264869 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -263,35 +263,38 @@ private Pair processDownstreams( if (dataset.getLineage() != null) { if (config.isUsePatch()) { - UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = - new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); - for (Upstream upstream : dataset.getLineage().getUpstreams()) { - upstreamLineagePatchBuilder.addUpstream(upstream.getDataset(), upstream.getType()); - } + if (!dataset.getLineage().getUpstreams().isEmpty()) { + UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = + new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); + for (Upstream upstream : dataset.getLineage().getUpstreams()) { + upstreamLineagePatchBuilder.addUpstream( + upstream.getDataset(), upstream.getType()); + } - log.info("Adding FineGrainedLineage to {}", dataset.getUrn()); - for (FineGrainedLineage fineGrainedLineage : - Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { - for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { - for (Urn downstream : - Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { - upstreamLineagePatchBuilder.addFineGrainedUpstreamField( - downstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - upstream, - null); + log.info("Adding FineGrainedLineage to {}", dataset.getUrn()); + for (FineGrainedLineage fineGrainedLineage : + Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { + for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { + for (Urn downstream : + Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { + upstreamLineagePatchBuilder.addFineGrainedUpstreamField( + downstream, + fineGrainedLineage.getConfidenceScore(), + StringUtils.defaultIfEmpty( + fineGrainedLineage.getTransformOperation(), "TRANSFORM"), + upstream, + null); + } } } + MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); + log.info( + "upstreamLineagePatch: {}", + mcp.getAspect().getValue().asString(Charset.defaultCharset())); + mcps.add(mcp); + } else { + addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); } - MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); - log.info( - "upstreamLineagePatch: {}", - mcp.getAspect().getValue().asString(Charset.defaultCharset())); - mcps.add(mcp); - } else { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); } } }); From d0bdc2c64139da6c2d62b1e77a9dc22b40c681ec Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 22 May 2024 21:00:47 +0200 Subject: [PATCH 7/9] Updating version in doc --- metadata-integration/java/spark-lineage-beta/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/spark-lineage-beta/README.md index a521f1770592b5..83b7e517abdae6 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/spark-lineage-beta/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.5 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.3 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.5 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.5 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.3") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.5") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.3") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.5") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") From df3a31f3ffa9f9c5f7362bfcbc48ca52447e6860 Mon Sep 17 00:00:00 2001 From: treff7es Date: Thu, 23 May 2024 21:52:32 +0200 Subject: [PATCH 8/9] Adding more patch fixes --- .../io/datahubproject/openlineage/dataset/DatahubJob.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 22fca804264869..5f4a9b6a596e78 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -174,7 +174,7 @@ private void generateDataJobInputOutputMcp( List mcps) { DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(); log.info("Adding DataJob edges to {}", jobUrn); - if (config.isUsePatch()) { + if (config.isUsePatch() && (!parentJobs.isEmpty() || !inSet.isEmpty() || !outSet.isEmpty())) { DataJobInputOutputPatchBuilder dataJobInputOutputPatchBuilder = new DataJobInputOutputPatchBuilder().urn(jobUrn); for (DatahubDataset dataset : inSet) { @@ -360,7 +360,7 @@ private void generateFlowGlobalTagsAspect( DatahubOpenlineageConfig config, List mcps) { if (flowGlobalTags != null) { - if (config.isUsePatch()) { + if ((config.isUsePatch() && (!flowGlobalTags.getTags().isEmpty()))) { GlobalTagsPatchBuilder globalTagsPatchBuilder = new GlobalTagsPatchBuilder().urn(flowUrn); for (TagAssociation tag : flowGlobalTags.getTags()) { globalTagsPatchBuilder.addTag(tag.getTag(), null); From 9b388f06130eafcc40e000ecefa257075b1e50b0 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 29 May 2024 22:20:41 +0200 Subject: [PATCH 9/9] Fixing doc --- .../java/spark-lineage-beta/README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/spark-lineage-beta/README.md index 83b7e517abdae6..7b3598453498f7 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/spark-lineage-beta/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.5 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.5 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.6 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.5 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.5") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.6") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.5") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.6") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -182,7 +182,7 @@ information like tokens. | spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | | spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | | spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . | -| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. +| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. | ## What to Expect: The Metadata Model