From d78287cc578a1f32629f4fdbc423ab47f74b674d Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 29 May 2024 22:58:55 +0200 Subject: [PATCH] fix(ingest/spark): Bumping OpenLineage version to 0.14.0 (#10559) --- build.gradle | 2 +- .../builder/UpstreamLineagePatchBuilder.java | 89 +------- .../config/DatahubOpenlineageConfig.java | 2 +- .../converter/OpenLineageToDataHub.java | 39 ++++ .../openlineage/dataset/DatahubJob.java | 63 +++--- .../java/spark-lineage-beta/README.md | 14 +- .../datahub/spark/DatahubSparkListener.java | 12 +- .../datahub/spark/conf/SparkConfigParser.java | 2 +- .../agent/util/RemovePathPatternUtils.java | 2 +- .../plan/catalog/IcebergHandler.java | 192 ------------------ .../java/spark-lineage/README.md | 8 +- 11 files changed, 97 insertions(+), 328 deletions(-) delete mode 100644 metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java diff --git a/build.gradle b/build.gradle index 5264c1c58313c7..437ac4fbf6f32d 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '12.16.1' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.13.1' + ext.openLineageVersion = '1.14.0' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java index 40b6de68f7b56a..08182761aeb03f 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java @@ -9,8 +9,6 @@ import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; import com.linkedin.dataset.DatasetLineageType; -import com.linkedin.dataset.FineGrainedLineageDownstreamType; -import com.linkedin.dataset.FineGrainedLineageUpstreamType; import com.linkedin.metadata.aspect.patch.PatchOperationType; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -57,48 +55,6 @@ public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn return this; } - /** - * Adds a field as a fine grained upstream - * - * @param schemaFieldUrn a schema field to be marked as upstream, format: - * urn:li:schemaField(DATASET_URN, COLUMN NAME) - * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for - * full confidence - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @param type the upstream lineage type, either Field or Field Set - * @return this builder - */ - public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( - @Nonnull Urn schemaFieldUrn, - @Nullable Float confidenceScore, - @Nonnull String transformationOperation, - @Nullable FineGrainedLineageUpstreamType type) { - Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); - } else { - finalType = type.toString(); - } - - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "upstreamType" - + "/" - + finalType - + "/" - + encodeValueUrn(schemaFieldUrn), - instance.numberNode(finalConfidenceScore))); - - return this; - } - /** * Adds a field as a fine grained upstream * @@ -135,54 +91,13 @@ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( FINE_GRAINED_PATH_START + transformationOperation + "/" - + downstreamSchemaField + + encodeValueUrn(downstreamSchemaField) + "/" + finalQueryUrn + "/" + encodeValueUrn(upstreamSchemaField), - instance.numberNode(finalConfidenceScore))); - - return this; - } - - /** - * Adds a field as a fine grained downstream - * - * @param schemaFieldUrn a schema field to be marked as downstream, format: - * urn:li:schemaField(DATASET_URN, COLUMN NAME) - * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for - * full confidence - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @param type the downstream lineage type, either Field or Field Set - * @return this builder - */ - public UpstreamLineagePatchBuilder addFineGrainedDownstreamField( - @Nonnull Urn schemaFieldUrn, - @Nullable Float confidenceScore, - @Nonnull String transformationOperation, - @Nullable FineGrainedLineageDownstreamType type) { - Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); - } else { - finalType = type.toString(); - } + fineGrainedLineageNode)); - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "downstreamType" - + "/" - + finalType - + "/" - + encodeValueUrn(schemaFieldUrn), - instance.numberNode(finalConfidenceScore))); return this; } diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java index b8d4d53511cd0b..5e4b791fa8d82d 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java @@ -27,7 +27,7 @@ public class DatahubOpenlineageConfig { @Builder.Default private final boolean captureColumnLevelLineage = true; @Builder.Default private final DataJobUrn parentJobUrn = null; // This is disabled until column level patch support won't be fixed in GMS - @Builder.Default private final boolean usePatch = false; + @Builder.Default private final boolean usePatch = true; public List getPathSpecsForPlatform(String platform) { if ((pathSpecs == null) || (pathSpecs.isEmpty())) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 59cac8719c303a..1db09306cbdc22 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -423,6 +423,45 @@ private static StringMap generateCustomProperties( for (Map.Entry entry : event.getRun().getFacets().getAdditionalProperties().entrySet()) { switch (entry.getKey()) { + case "spark_jobDetails": + if (entry.getValue().getAdditionalProperties().get("jobId") != null) { + customProperties.put( + "jobId", + (String) entry.getValue().getAdditionalProperties().get("jobId").toString()); + } + if (entry.getValue().getAdditionalProperties().get("jobDescription") != null) { + customProperties.put( + "jobDescription", + (String) entry.getValue().getAdditionalProperties().get("jobDescription")); + } + if (entry.getValue().getAdditionalProperties().get("jobGroup") != null) { + customProperties.put( + "jobGroup", (String) entry.getValue().getAdditionalProperties().get("jobGroup")); + } + if (entry.getValue().getAdditionalProperties().get("jobCallSite") != null) { + customProperties.put( + "jobCallSite", + (String) entry.getValue().getAdditionalProperties().get("jobCallSite")); + } + case "processing_engine": + if (entry.getValue().getAdditionalProperties().get("processing-engine") != null) { + customProperties.put( + "processing-engine", + (String) entry.getValue().getAdditionalProperties().get("name")); + } + if (entry.getValue().getAdditionalProperties().get("processing-engine-version") != null) { + customProperties.put( + "processing-engine-version", + (String) entry.getValue().getAdditionalProperties().get("version")); + } + if (entry.getValue().getAdditionalProperties().get("openlineage-adapter-version") + != null) { + customProperties.put( + "openlineage-adapter-version", + (String) + entry.getValue().getAdditionalProperties().get("openlineageAdapterVersion")); + } + case "spark_version": { if (entry.getValue().getAdditionalProperties().get("spark-version") != null) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 3682a42bb3571c..5f4a9b6a596e78 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -174,7 +174,7 @@ private void generateDataJobInputOutputMcp( List mcps) { DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(); log.info("Adding DataJob edges to {}", jobUrn); - if (config.isUsePatch()) { + if (config.isUsePatch() && (!parentJobs.isEmpty() || !inSet.isEmpty() || !outSet.isEmpty())) { DataJobInputOutputPatchBuilder dataJobInputOutputPatchBuilder = new DataJobInputOutputPatchBuilder().urn(jobUrn); for (DatahubDataset dataset : inSet) { @@ -263,39 +263,38 @@ private Pair processDownstreams( if (dataset.getLineage() != null) { if (config.isUsePatch()) { - UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = - new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); - for (Upstream upstream : dataset.getLineage().getUpstreams()) { - upstreamLineagePatchBuilder.addUpstream(upstream.getDataset(), upstream.getType()); - } - - log.info("Adding FineGrainedLineage to {}", dataset.getUrn()); - for (FineGrainedLineage fineGrainedLineage : - Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { - for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { - upstreamLineagePatchBuilder.addFineGrainedUpstreamField( - upstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - fineGrainedLineage.getUpstreamType()); + if (!dataset.getLineage().getUpstreams().isEmpty()) { + UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = + new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); + for (Upstream upstream : dataset.getLineage().getUpstreams()) { + upstreamLineagePatchBuilder.addUpstream( + upstream.getDataset(), upstream.getType()); } - for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { - upstreamLineagePatchBuilder.addFineGrainedDownstreamField( - downstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - fineGrainedLineage.getDownstreamType()); + + log.info("Adding FineGrainedLineage to {}", dataset.getUrn()); + for (FineGrainedLineage fineGrainedLineage : + Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { + for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { + for (Urn downstream : + Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { + upstreamLineagePatchBuilder.addFineGrainedUpstreamField( + downstream, + fineGrainedLineage.getConfidenceScore(), + StringUtils.defaultIfEmpty( + fineGrainedLineage.getTransformOperation(), "TRANSFORM"), + upstream, + null); + } + } } + MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); + log.info( + "upstreamLineagePatch: {}", + mcp.getAspect().getValue().asString(Charset.defaultCharset())); + mcps.add(mcp); + } else { + addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); } - MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); - log.info( - "upstreamLineagePatch: {}", - mcp.getAspect().getValue().asString(Charset.defaultCharset())); - mcps.add(mcp); - } else { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); } } }); @@ -361,7 +360,7 @@ private void generateFlowGlobalTagsAspect( DatahubOpenlineageConfig config, List mcps) { if (flowGlobalTags != null) { - if (config.isUsePatch()) { + if ((config.isUsePatch() && (!flowGlobalTags.getTags().isEmpty()))) { GlobalTagsPatchBuilder globalTagsPatchBuilder = new GlobalTagsPatchBuilder().urn(flowUrn); for (TagAssociation tag : flowGlobalTags.getTags()) { globalTagsPatchBuilder.addTag(tag.getTag(), null); diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/spark-lineage-beta/README.md index e09bc3938b6868..7b3598453498f7 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/spark-lineage-beta/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.3 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.6 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.3 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.6 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.3") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.6") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.3") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.6") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -181,8 +181,8 @@ information like tokens. | spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | | spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | | spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . | -| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . | +| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. | ## What to Expect: The Metadata Model diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java index 38de142c4dd171..54bb3821eddedf 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java @@ -87,11 +87,10 @@ private static SparkAppContext getSparkAppContext( public void onApplicationStart(SparkListenerApplicationStart applicationStart) { long startTime = System.currentTimeMillis(); - initializeContextFactoryIfNotInitialized(); log.info("Application start called"); this.appContext = getSparkAppContext(applicationStart); - + initializeContextFactoryIfNotInitialized(); listener.onApplicationStart(applicationStart); long elapsedTime = System.currentTimeMillis() - startTime; log.info("onApplicationStart completed successfully in {} ms", elapsedTime); @@ -155,7 +154,8 @@ public Optional initializeEmitter(Config sparkConf) { return Optional.empty(); } - private synchronized void loadDatahubConfig(SparkAppContext appContext, Properties properties) { + private synchronized SparkLineageConf loadDatahubConfig( + SparkAppContext appContext, Properties properties) { long startTime = System.currentTimeMillis(); datahubConf = parseSparkConfig(); SparkEnv sparkEnv = SparkEnv$.MODULE$.get(); @@ -169,14 +169,15 @@ private synchronized void loadDatahubConfig(SparkAppContext appContext, Properti Optional> databricksTags = getDatabricksTags(datahubConf); this.appContext.setDatabricksTags(databricksTags.orElse(null)); } + log.info("Datahub configuration: {}", datahubConf.root().render()); Optional restEmitter = initializeEmitter(datahubConf); SparkLineageConf sparkLineageConf = SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null)); - emitter.setConfig(sparkLineageConf); long elapsedTime = System.currentTimeMillis() - startTime; log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime); + return sparkLineageConf; } public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { @@ -220,7 +221,6 @@ public void onJobStart(SparkListenerJobStart jobStart) { initializeContextFactoryIfNotInitialized(); log.debug("Job start called"); - loadDatahubConfig(this.appContext, jobStart.properties()); listener.onJobStart(jobStart); long elapsedTime = System.currentTimeMillis() - startTime; log.debug("onJobStart completed successfully in {} ms", elapsedTime); @@ -333,10 +333,12 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin return; } try { + SparkLineageConf datahubConfig = loadDatahubConfig(appContext, null); SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf); // Needs to be done before initializing OpenLineageClient initializeMetrics(config); emitter = new DatahubEventEmitter(config, appName); + emitter.setConfig(datahubConfig); contextFactory = new ContextFactory(emitter, meterRegistry, config); circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build(); OpenLineageSparkListener.init(contextFactory); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java index f1af56ff888d3c..d8da5d95935c9a 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -307,7 +307,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) { public static boolean isPatchEnabled(Config datahubConfig) { if (!datahubConfig.hasPath(PATCH_ENABLED)) { - return false; + return true; } return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED); } diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java index a606a44ddd5160..c44dacf8ff3bea 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java @@ -22,7 +22,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java deleted file mode 100644 index dcd1cf3fb3aff7..00000000000000 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.java +++ /dev/null @@ -1,192 +0,0 @@ -/* -/* Copyright 2018-2023 contributors to the OpenLineage project -/* SPDX-License-Identifier: Apache-2.0 -*/ - -package io.openlineage.spark3.agent.lifecycle.plan.catalog; - -import io.openlineage.client.OpenLineage; -import io.openlineage.client.utils.DatasetIdentifier; -import io.openlineage.spark.agent.util.PathUtils; -import io.openlineage.spark.agent.util.ScalaConversionUtils; -import io.openlineage.spark.agent.util.SparkConfUtils; -import io.openlineage.spark.api.OpenLineageContext; -import java.io.File; -import java.net.URI; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import javax.annotation.Nullable; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.spark.SparkCatalog; -import org.apache.iceberg.spark.SparkSessionCatalog; -import org.apache.iceberg.spark.source.SparkTable; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; - -@Slf4j -public class IcebergHandler implements CatalogHandler { - - private final OpenLineageContext context; - - private static final String TYPE = "type"; - private static final String CATALOG_IMPL = "catalog-impl"; - private static final String IO_IMPL = "io-impl"; - - public IcebergHandler(OpenLineageContext context) { - this.context = context; - } - - @Override - public boolean hasClasses() { - try { - IcebergHandler.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog"); - return true; - } catch (Exception e) { - // swallow- we don't care - } - return false; - } - - @Override - public boolean isClass(TableCatalog tableCatalog) { - return (tableCatalog instanceof SparkCatalog) || (tableCatalog instanceof SparkSessionCatalog); - } - - @Override - public DatasetIdentifier getDatasetIdentifier( - SparkSession session, - TableCatalog tableCatalog, - Identifier identifier, - Map properties) { - String catalogName = tableCatalog.name(); - - String prefix = String.format("spark.sql.catalog.%s", catalogName); - Map conf = - ScalaConversionUtils.fromMap(session.conf().getAll()); - log.info(conf.toString()); - Map catalogConf = - conf.entrySet().stream() - .filter(x -> x.getKey().startsWith(prefix)) - .filter(x -> x.getKey().length() > prefix.length()) - .collect( - Collectors.toMap( - x -> x.getKey().substring(prefix.length() + 1), // handle dot after prefix - Map.Entry::getValue)); - - log.info(catalogConf.toString()); - if (catalogConf.isEmpty() - || (!catalogConf.containsKey(TYPE) - && !catalogConf.get(CATALOG_IMPL).equals("org.apache.iceberg.aws.glue.GlueCatalog"))) { - throw new UnsupportedCatalogException(catalogName); - } - log.info(catalogConf.get(TYPE)); - - String warehouse = catalogConf.get(CatalogProperties.WAREHOUSE_LOCATION); - DatasetIdentifier di; - - if (catalogConf.get(CATALOG_IMPL).equals("org.apache.iceberg.aws.glue.GlueCatalog")) { - di = new DatasetIdentifier(identifier.toString(), "glue"); - log.info("Glue catalog detected, returning glue dataset identifier {}", di); - return di; - } else { - di = PathUtils.fromPath(new Path(warehouse, identifier.toString())); - } - if (catalogConf.get(TYPE).equals("hive")) { - di.withSymlink( - getHiveIdentifier( - session, catalogConf.get(CatalogProperties.URI), identifier.toString())); - } else if (catalogConf.get(TYPE).equals("hadoop")) { - di.withSymlink( - identifier.toString(), - StringUtils.substringBeforeLast( - di.getName(), File.separator), // parent location from a name becomes a namespace - DatasetIdentifier.SymlinkType.TABLE); - } else if (catalogConf.get(TYPE).equals("rest")) { - di.withSymlink( - getRestIdentifier( - session, catalogConf.get(CatalogProperties.URI), identifier.toString())); - } else if (catalogConf.get(TYPE).equals("nessie")) { - di.withSymlink( - getNessieIdentifier( - session, catalogConf.get(CatalogProperties.URI), identifier.toString())); - } - - return di; - } - - @SneakyThrows - private DatasetIdentifier.Symlink getNessieIdentifier( - SparkSession session, @Nullable String confUri, String table) { - - String uri = new URI(confUri).toString(); - return new DatasetIdentifier.Symlink(table, uri, DatasetIdentifier.SymlinkType.TABLE); - } - - @SneakyThrows - private DatasetIdentifier.Symlink getHiveIdentifier( - SparkSession session, @Nullable String confUri, String table) { - String slashPrefixedTable = String.format("/%s", table); - URI uri; - if (confUri == null) { - uri = - SparkConfUtils.getMetastoreUri(session.sparkContext().conf()) - .orElseThrow(() -> new UnsupportedCatalogException("hive")); - } else { - uri = new URI(confUri); - } - DatasetIdentifier metastoreIdentifier = - PathUtils.fromPath( - new Path(PathUtils.enrichHiveMetastoreURIWithTableName(uri, slashPrefixedTable))); - - return new DatasetIdentifier.Symlink( - metastoreIdentifier.getName(), - metastoreIdentifier.getNamespace(), - DatasetIdentifier.SymlinkType.TABLE); - } - - @SneakyThrows - private DatasetIdentifier.Symlink getRestIdentifier( - SparkSession session, @Nullable String confUri, String table) { - - String uri = new URI(confUri).toString(); - return new DatasetIdentifier.Symlink(table, uri, DatasetIdentifier.SymlinkType.TABLE); - } - - @Override - public Optional getStorageDatasetFacet( - Map properties) { - String format = properties.getOrDefault("format", ""); - return Optional.of( - context.getOpenLineage().newStorageDatasetFacet("iceberg", format.replace("iceberg/", ""))); - } - - @SneakyThrows - @Override - public Optional getDatasetVersion( - TableCatalog tableCatalog, Identifier identifier, Map properties) { - SparkTable table; - try { - table = (SparkTable) tableCatalog.loadTable(identifier); - } catch (NoSuchTableException | ClassCastException e) { - log.error("Failed to load table from catalog: {}", identifier, e); - return Optional.empty(); - } - - if (table.table() != null && table.table().currentSnapshot() != null) { - return Optional.of(Long.toString(table.table().currentSnapshot().snapshotId())); - } - return Optional.empty(); - } - - @Override - public String getName() { - return "iceberg"; - } -} diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index f56cb14a1ae546..041408aac6d6d3 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -1,4 +1,10 @@ -# Spark +# Spark (Legacy) + +:::note + +This is our legacy Spark Integration which is replaced by [Acryl Spark Lineage](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta) + +::: To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios.