Skip to content

Commit

Permalink
feat(spark-lineage): add support for iceberg and cache based plans (d…
Browse files Browse the repository at this point in the history
…atahub-project#4882)

Co-authored-by: magzhu <[email protected]>
  • Loading branch information
maggie-zhu and magzhu authored May 16, 2022
1 parent f847fa3 commit 46760a7
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -28,6 +29,9 @@
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.columnar.InMemoryRelation;
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;

Expand Down Expand Up @@ -101,6 +105,9 @@ public void run() {

DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get());
Collection<QueryPlan<?>> allInners = new ArrayList<>();
Collection<QueryPlan<?>> allInmemoryRelationSparkPlan = new ArrayList<>();
Collection<QueryPlan<?>> allInmemoryRelationInnersSparkPlan = new ArrayList<>();
Stack<QueryPlan<?>> allInmemoryRelationTableScanPlan = new Stack<>();

plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {

Expand All @@ -110,6 +117,12 @@ public Void apply(LogicalPlan plan) {
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> lineage.addSource(x));
allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren()));

//deal with sparkPlans in complex logical plan
if (plan instanceof InMemoryRelation) {
InMemoryRelation cmd = (InMemoryRelation) plan;
allInmemoryRelationSparkPlan.add(cmd.cachedPlan());
}
return null;
}

Expand All @@ -134,6 +147,12 @@ public Void apply(LogicalPlan plan) {
inputDS.ifPresent(
x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x));
inputDS.ifPresent(x -> lineage.addSource(x));

//deal with sparkPlans in complex logical plan
if (plan instanceof InMemoryRelation) {
InMemoryRelation cmd = (InMemoryRelation) plan;
allInmemoryRelationSparkPlan.add(cmd.cachedPlan());
}
return null;
}

Expand All @@ -144,6 +163,107 @@ public boolean isDefinedAt(LogicalPlan x) {
});
}

for (QueryPlan<?> qpInmemoryRelation : allInmemoryRelationSparkPlan) {
if (!(qpInmemoryRelation instanceof SparkPlan)) {
continue;
}
SparkPlan sparkPlan = (SparkPlan) qpInmemoryRelation;
sparkPlan.collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));
allInmemoryRelationInnersSparkPlan.addAll(JavaConversions.asJavaCollection(sp.innerChildren()));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});
}

for (QueryPlan<?> qpInmemoryRelationInners : allInmemoryRelationInnersSparkPlan) {
if (!(qpInmemoryRelationInners instanceof SparkPlan)) {
continue;
}
SparkPlan sparkPlan = (SparkPlan) qpInmemoryRelationInners;
sparkPlan.collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});
}

while (!allInmemoryRelationTableScanPlan.isEmpty()) {
QueryPlan<?> qpInmemoryRelationTableScan = allInmemoryRelationTableScanPlan.pop();
InMemoryTableScanExec imPlan = (InMemoryTableScanExec) qpInmemoryRelationTableScan;
Collection<QueryPlan<?>> allInnerPhysicalPlan = new ArrayList<>();
imPlan.relation().cachedPlan().collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));
allInnerPhysicalPlan.addAll(JavaConversions.asJavaCollection(sp.innerChildren()));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});

for (QueryPlan<?> qpInner : allInnerPhysicalPlan) {
if (!(qpInner instanceof SparkPlan)) {
continue;
}
SparkPlan sparkPlan = (SparkPlan) qpInner;
sparkPlan.collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});
}
}

SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import org.apache.spark.SparkContext;
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand;
import org.apache.spark.sql.execution.datasources.HadoopFsRelation;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.sources.BaseRelation;
Expand All @@ -43,10 +48,12 @@
public class DatasetExtractor {

private static final Map<Class<? extends LogicalPlan>, PlanToDataset> PLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends SparkPlan>, SparkPlanToDataset> SPARKPLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends BaseRelation>, RelationToDataset> REL_TO_DATASET = new HashMap<>();
private static final Set<Class<? extends LogicalPlan>> OUTPUT_CMD = ImmutableSet.of(
InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class,
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class);
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class,
WriteToDataSourceV2.class);
private static final String DATASET_ENV_KEY = "metadata.dataset.env";
private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand
Expand All @@ -58,6 +65,10 @@ private static interface PlanToDataset {
Optional<? extends SparkDataset> fromPlanNode(LogicalPlan plan, SparkContext ctx, Config datahubConfig);
}

private static interface SparkPlanToDataset {
Optional<? extends SparkDataset> fromSparkPlanNode(SparkPlan plan, SparkContext ctx, Config datahubConfig);
}

private static interface RelationToDataset {
Optional<? extends SparkDataset> fromRelation(BaseRelation rel, SparkContext ctx, Config datahubConfig);
}
Expand Down Expand Up @@ -112,6 +123,45 @@ private static interface RelationToDataset {
return Optional.of(new CatalogTableDataset(cmd.tableMeta(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
});

PLAN_TO_DATASET.put(WriteToDataSourceV2.class, (p, ctx, datahubConfig) -> {
WriteToDataSourceV2 cmd = (WriteToDataSourceV2) p;
if (!cmd.writer().toString().contains("IcebergWrite")) {
return Optional.empty();
} else {
String[] names = cmd.writer().toString().split(",")[0].split("\\.");
String tableName = names[names.length - 1];
return Optional.of(new CatalogTableDataset("iceberg", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
}
});

PLAN_TO_DATASET.put(DataSourceV2Relation.class, (p, ctx, datahubConfig) -> {
DataSourceV2Relation cmd = (DataSourceV2Relation) p;
if (!cmd.source().toString().contains("IcebergSource") && !cmd.source().toString().contains("iceberg")) {
return Optional.empty();
} else {
String[] names = cmd.options().get("path").get().split("\\.");
String tableName = names[names.length - 1];
return Optional.of(new CatalogTableDataset("iceberg", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
}
});

SPARKPLAN_TO_DATASET.put(DataSourceV2ScanExec.class, (sp, ctx, datahubConfig) -> {
DataSourceV2ScanExec cmd = (DataSourceV2ScanExec) sp;
if (!sp.toString().contains("iceberg")) {
return Optional.empty();
} else {
String[] names = cmd.options().get("path").get().split("\\.");
String tableName = names[names.length - 1];
return Optional.of(new CatalogTableDataset("iceberg", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
}
});

SPARKPLAN_TO_DATASET.put(FileSourceScanExec.class, (sp, ctx, datahubConfig) -> {
FileSourceScanExec cmd = (FileSourceScanExec) sp;
String tableName = cmd.tableIdentifier().get().table();
return Optional.of(new CatalogTableDataset("hive", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
});

REL_TO_DATASET.put(HadoopFsRelation.class, (r, ctx, datahubConfig) -> {
List<Path> res = JavaConversions.asJavaCollection(((HadoopFsRelation) r).location().rootPaths()).stream()
.map(p -> getDirectoryPath(p, ctx.hadoopConfiguration())).distinct().collect(Collectors.toList());
Expand Down Expand Up @@ -142,6 +192,15 @@ static Optional<? extends SparkDataset> asDataset(LogicalPlan logicalPlan, Spark
return PLAN_TO_DATASET.get(logicalPlan.getClass()).fromPlanNode(logicalPlan, ctx, datahubconfig);
}

static Optional<? extends SparkDataset> asDataset(SparkPlan sparkPlan, SparkContext ctx, boolean outputNode) {

if (!SPARKPLAN_TO_DATASET.containsKey(sparkPlan.getClass())) {
return Optional.empty();
}
Config datahubconfig = LineageUtils.parseSparkConfig();
return SPARKPLAN_TO_DATASET.get(sparkPlan.getClass()).fromSparkPlanNode(sparkPlan, ctx, datahubconfig);
}

private static Path getDirectoryPath(Path p, Configuration hadoopConf) {
try {
if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ public CatalogTableDataset(String dsName, String platformInstance, FabricType fa
super("hive", platformInstance, dsName, fabricType);
}

public CatalogTableDataset(String dbName, String dsName, String platformInstance, FabricType fabricType) {
super(dbName, platformInstance, dsName, fabricType);
}
}

0 comments on commit 46760a7

Please sign in to comment.