Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimization-3923][core] Optimize flinksql column lineage #3924

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public Result<String> exportSql(@RequestParam Integer id) {
dataType = "Integer",
paramType = "query",
dataTypeClass = Integer.class)
public Result getTaskLineage(@RequestParam Integer id) {
public Result getTaskLineage(@RequestParam Integer id) throws NotSupportExplainExcepition {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.getTaskLineage(id), Status.QUERY_SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public interface TaskService extends ISuperService<Task> {
* @param id The id of the task to get.
* @return A {@link LineageResult} object representing the found task lineage.
*/
LineageResult getTaskLineage(Integer id);
LineageResult getTaskLineage(Integer id) throws NotSupportExplainExcepition;

/**
* Build the job submit config with the given task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.dinky.data.model.mapping.ClusterInstanceMapping;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.vo.task.JobInstanceVo;
import org.dinky.executor.ExecutorConfig;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.FlinkJobTask;
Expand Down Expand Up @@ -289,7 +288,7 @@ public void refreshJobByTaskIds(Integer... taskIds) {
@Override
public LineageResult getLineage(Integer id) {
History history = getJobInfoDetail(id).getHistory();
return LineageBuilder.getColumnLineageByLogicalPlan(history.getStatement(), ExecutorConfig.DEFAULT);
return LineageBuilder.getColumnLineageByLogicalPlan(history.getStatement());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@
import org.dinky.data.model.udf.UDFTemplate;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.function.FunctionFactory;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.data.model.UDF;
Expand Down Expand Up @@ -980,27 +978,10 @@ public Result<Tree<Integer>> queryAllCatalogue() {
}

@Override
public LineageResult getTaskLineage(Integer id) {
public LineageResult getTaskLineage(Integer id) throws NotSupportExplainExcepition {
TaskDTO task = getTaskInfoById(id);
if (!Dialect.isCommonSql(task.getDialect())) {
if (Asserts.isNull(task.getDatabaseId())) {
return null;
}
DataBase dataBase = dataBaseService.getById(task.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return null;
}
if (task.getDialect().equalsIgnoreCase("doris") || task.getDialect().equalsIgnoreCase("starrocks")) {
return SQLLineageBuilder.getSqlLineage(task.getStatement(), "mysql", dataBase.getDriverConfig());
} else {
return SQLLineageBuilder.getSqlLineage(
task.getStatement(), task.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
} else {
task.setStatement(buildEnvSql(task) + task.getStatement());
JobConfig jobConfig = task.getJobConfig();
return LineageBuilder.getColumnLineageByLogicalPlan(task.getStatement(), jobConfig);
}
BaseTask baseTask = BaseTask.getTask(task);
return baseTask.getColumnLineage();
}

private List<TreeNode<Integer>> dealWithCatalogue(List<Catalogue> catalogueList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.JobResult;

import java.util.List;
Expand Down Expand Up @@ -57,6 +58,11 @@ public ObjectNode getJobPlan() throws NotSupportExplainExcepition {
StrFormatter.format("task [{}] dialect [{}] is can not getJobPlan", task.getName(), task.getDialect()));
}

public LineageResult getColumnLineage() throws NotSupportExplainExcepition {
throw new NotSupportExplainExcepition(StrFormatter.format(
"task [{}] dialect [{}] is can not get column lineage", task.getName(), task.getDialect()));
}

public static BaseTask getTask(TaskDTO taskDTO) {
Set<Class<?>> classes =
ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@

package org.dinky.service.task;

import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.data.annotations.SupportDialect;
import org.dinky.data.dto.SqlDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.model.DataBase;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.job.JobResult;
import org.dinky.service.DataBaseService;
import org.dinky.service.impl.DataBaseServiceImpl;

import java.util.List;

Expand Down Expand Up @@ -80,4 +85,21 @@ public JobResult StreamExecute() {
public boolean stop() {
return false;
}

public LineageResult getColumnLineage() {
if (Asserts.isNull(task.getDatabaseId())) {
return null;
}
DataBaseService dataBaseService = SpringUtil.getBean(DataBaseServiceImpl.class);
DataBase dataBase = dataBaseService.getById(task.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return null;
}
if (task.getDialect().equalsIgnoreCase("doris") || task.getDialect().equalsIgnoreCase("starrocks")) {
return SQLLineageBuilder.getSqlLineage(task.getStatement(), "mysql", dataBase.getDriverConfig());
} else {
return SQLLineageBuilder.getSqlLineage(
task.getStatement(), task.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.JobManager;
import org.dinky.job.JobResult;
import org.dinky.service.TaskService;
Expand Down Expand Up @@ -62,6 +63,10 @@ public ObjectNode getJobPlan() {
return JsonUtils.parseObject(planJson);
}

public LineageResult getColumnLineage() {
return jobManager.getColumnLineage(task.getStatement());
}

@Override
public JobResult execute() throws Exception {
log.info("Initializing Flink job config...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,23 @@

package org.dinky.utils;

import org.dinky.data.model.FunctionResult;
import org.dinky.data.model.LineageRel;
import org.dinky.executor.CustomParser;
import org.dinky.executor.CustomTableEnvironment;

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -85,7 +76,6 @@ private Tuple2<String, RelNode> parseStatement(String sql) {
Operation operation = operations.get(0);
if (operation instanceof SinkModifyOperation) {
SinkModifyOperation sinkOperation = (SinkModifyOperation) operation;

PlannerQueryOperation queryOperation = (PlannerQueryOperation) sinkOperation.getChild();
RelNode relNode = queryOperation.getCalciteTree();
return new Tuple2<>(
Expand Down Expand Up @@ -156,50 +146,4 @@ private List<LineageRel> buildFiledLineageResult(String sinkTable, RelNode optRe
}
return resultList;
}

/**
* Analyze custom functions from SQL, does not contain system functions.
*
* @param singleSql the SQL statement to analyze
* @return custom functions set
*/
public Set<FunctionResult> analyzeFunction(String singleSql) {
LOG.info("Analyze function Sql: \n {}", singleSql);
CustomParser parser = (CustomParser) tableEnv.getParser();

// parsing sql and return the abstract syntax tree
SqlNode sqlNode = parser.parseSql(singleSql);

// validate the query
SqlNode validated = parser.validate(sqlNode);

// look for all functions
FunctionVisitor visitor = new FunctionVisitor();
validated.accept(visitor);
List<UnresolvedIdentifier> fullFunctionList = visitor.getFunctionList();

// filter custom functions
Set<FunctionResult> resultSet = new HashSet<>();
for (UnresolvedIdentifier unresolvedIdentifier : fullFunctionList) {
getFunctionCatalog()
.lookupFunction(unresolvedIdentifier)
.flatMap(ContextResolvedFunction::getIdentifier)
// the objectIdentifier of the built-in function is null
.flatMap(FunctionIdentifier::getIdentifier)
.ifPresent(identifier -> {
FunctionResult functionResult = new FunctionResult()
.setCatalogName(identifier.getCatalogName())
.setDatabase(identifier.getDatabaseName())
.setFunctionName(identifier.getObjectName());
LOG.debug("analyzed function: {}", functionResult);
resultSet.add(functionResult);
});
}
return resultSet;
}

private FunctionCatalog getFunctionCatalog() {
PlannerBase planner = (PlannerBase) tableEnv.getPlanner();
return planner.getFlinkContext().getFunctionCatalog();
}
}
11 changes: 2 additions & 9 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,15 @@
public class Explainer {

private Executor executor;
private boolean useStatementSet;
private JobManager jobManager;

public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) {
public Explainer(Executor executor, JobManager jobManager) {
this.executor = executor;
this.useStatementSet = useStatementSet;
this.jobManager = jobManager;
}

public static Explainer build(JobManager jobManager) {
return new Explainer(jobManager.getExecutor(), true, jobManager);
}

public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) {
return new Explainer(executor, useStatementSet, jobManager);
return new Explainer(jobManager.getExecutor(), jobManager);
}

public Explainer initialize(JobConfig config, String statement) {
Expand Down Expand Up @@ -229,7 +223,6 @@ public List<LineageRel> getLineage(String statement) {
.type(GatewayType.LOCAL.getLongValue())
.useRemote(false)
.fragment(true)
.statementSet(useStatementSet)
.parallelism(1)
.configJson(executor.getTableConfig().getConfiguration().toMap())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
package org.dinky.explainer.lineage;

import org.dinky.data.model.LineageRel;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.explainer.Explainer;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
Expand All @@ -41,20 +38,17 @@ public class LineageBuilder {

public static LineageResult getColumnLineageByLogicalPlan(String statement, JobConfig jobConfig) {
JobManager jobManager = JobManager.buildPlanMode(jobConfig);
Explainer explainer = new Explainer(jobManager.getExecutor(), false, jobManager);
return getColumnLineageByLogicalPlan(statement, explainer);
Explainer explainer = Explainer.build(jobManager);
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
}

public static LineageResult getColumnLineageByLogicalPlan(String statement, ExecutorConfig executorConfig) {
public static LineageResult getColumnLineageByLogicalPlan(String statement) {
JobManager jobManager = JobManager.buildPlanMode(JobConfig.buildPlanConfig());
Executor executor = ExecutorFactory.buildExecutor(executorConfig, jobManager.getDinkyClassLoader());
jobManager.setExecutor(executor);
Explainer explainer = new Explainer(executor, false, jobManager);
return getColumnLineageByLogicalPlan(statement, explainer);
Explainer explainer = Explainer.build(jobManager);
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
}

public static LineageResult getColumnLineageByLogicalPlan(String statement, Explainer explainer) {
List<LineageRel> lineageRelList = explainer.getLineage(statement);
public static LineageResult getColumnLineageByLogicalPlan(List<LineageRel> lineageRelList) {
List<LineageRelation> relations = new ArrayList<>();
Map<String, LineageTable> tableMap = new HashMap<>();
int tableIndex = 1;
Expand Down
7 changes: 7 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.explainer.Explainer;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.function.util.UDFUtil;
import org.dinky.gateway.Gateway;
import org.dinky.gateway.config.FlinkConfig;
Expand Down Expand Up @@ -382,6 +384,11 @@ public String getJobPlanJson(String statement) {
return Explainer.build(this).getJobPlanInfo(statement).getJsonPlan();
}

public LineageResult getColumnLineage(String statement) {
return LineageBuilder.getColumnLineageByLogicalPlan(
Explainer.build(this).getLineage(statement));
}

public boolean cancelNormal(String jobId) {
try {
return FlinkAPI.build(config.getAddress()).stop(jobId);
Expand Down
3 changes: 1 addition & 2 deletions dinky-core/src/test/java/org/dinky/core/LineageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.dinky.core;

import org.dinky.executor.ExecutorConfig;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;

Expand Down Expand Up @@ -56,7 +55,7 @@ public void sumTest() {
+ " 'connector' = 'print'\n"
+ ");\n"
+ "insert into TT select a||c A ,b||c B from ST";
LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(sql, ExecutorConfig.DEFAULT);
LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(sql);
LOGGER.info("end");
}
}
3 changes: 1 addition & 2 deletions dinky-core/src/test/java/org/dinky/job/JobManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.dinky.data.enums.GatewayType;
import org.dinky.data.result.ExplainResult;
import org.dinky.executor.ExecutorConfig;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;

Expand Down Expand Up @@ -111,7 +110,7 @@ void testExecuteSql() throws Exception {
void testLineageSqlSingle() throws Exception {
String statement =
IOUtils.toString(Resources.getResource("flink/sql/single-insert.sql"), StandardCharsets.UTF_8);
LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(statement, ExecutorConfig.DEFAULT);
LineageResult result = LineageBuilder.getColumnLineageByLogicalPlan(statement);
assertNotNull(result);
assertEquals(2, result.getTables().size());
assertEquals(4, result.getRelations().size());
Expand Down
Loading