Skip to content

Commit

Permalink
[Feature] List Partition For AMV(Part 2): Support list partition for …
Browse files Browse the repository at this point in the history
…asynchronous materialized view with non-nullable partition columns (backport #46680) (#47712)

Signed-off-by: shuming.li <[email protected]>
Co-authored-by: shuming.li <[email protected]>
  • Loading branch information
mergify[bot] and LiShuMing authored Jul 1, 2024
1 parent f37cc4c commit 98d9a98
Show file tree
Hide file tree
Showing 27 changed files with 3,932 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public void addListPartitionKeys(String partitionName,
nameToPartKeys.put(partitionName, listPartitionKey);
}

public void addListPartitionKeys(Map<String, PListCell> listPartitionKeys) {
nameToPartKeys.putAll(listPartitionKeys);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Range;
import com.starrocks.analysis.Expr;
import com.starrocks.catalog.mv.MVTimelinessArbiter;
import com.starrocks.catalog.mv.MVTimelinessListPartitionArbiter;
import com.starrocks.catalog.mv.MVTimelinessNonPartitionArbiter;
import com.starrocks.catalog.mv.MVTimelinessRangePartitionArbiter;
import com.starrocks.common.AnalysisException;
Expand Down Expand Up @@ -104,6 +105,8 @@ public static MVTimelinessArbiter buildMVTimelinessArbiter(MaterializedView mv,
return new MVTimelinessNonPartitionArbiter(mv, isQueryRewrite);
} else if (partitionInfo.isRangePartition()) {
return new MVTimelinessRangePartitionArbiter(mv, isQueryRewrite);
} else if (partitionInfo.isListPartition()) {
return new MVTimelinessListPartitionArbiter(mv, isQueryRewrite);
} else {
throw UnsupportedException.unsupportedException("unsupported partition info type:" +
partitionInfo.getClass().getName());
Expand All @@ -114,10 +117,10 @@ public static MVTimelinessArbiter buildMVTimelinessArbiter(MaterializedView mv,
* Check whether mv needs to refresh based on the ref base table. It's a shortcut version of getMvBaseTableUpdateInfo.
* @return Optional<Boolean> : true if needs to refresh, false if not, empty if there are some unkown results.
*/
public static Optional<Boolean> needsToRefreshTable(MaterializedView mv,
Table baseTable,
boolean withMv,
boolean isQueryRewrite) {
private static Optional<Boolean> needsToRefreshTable(MaterializedView mv,
Table baseTable,
boolean withMv,
boolean isQueryRewrite) {
if (baseTable.isView()) {
// do nothing
return Optional.of(false);
Expand Down Expand Up @@ -195,9 +198,7 @@ public static MvBaseTableUpdateInfo getMvBaseTableUpdateInfo(MaterializedView mv
if (mvPartitionInfo.isListPartition()) {
Map<String, PListCell> partitionNameWithRange = getMVPartitionNameWithList(baseTable,
partitionColumn, updatedPartitionNamesList);
for (Map.Entry<String, PListCell> e : partitionNameWithRange.entrySet()) {
baseTableUpdateInfo.addListPartitionKeys(e.getKey(), e.getValue());
}
baseTableUpdateInfo.addListPartitionKeys(partitionNameWithRange);
baseTableUpdateInfo.addToRefreshPartitionNames(partitionNameWithRange.keySet());
} else if (mvPartitionInfo.isRangePartition()) {
Expr partitionExpr = MaterializedView.getPartitionExpr(mv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package com.starrocks.catalog.mv;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvBaseTableUpdateInfo;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
Expand All @@ -29,6 +31,7 @@
import java.util.Map;
import java.util.Set;

import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.catalog.MvRefreshArbiter.needsToRefreshTable;

/**
Expand Down Expand Up @@ -147,4 +150,22 @@ protected Set<String> getMVToRefreshPartitionNames(
}
return needRefreshMvPartitionNames;
}

/**
* Collect ref base table's update partition infos
* @param refBaseTableAndColumns ref base table and columns of mv
* @return ref base table's changed partition names
*/
protected Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table, Column> refBaseTableAndColumns,
MvUpdateInfo mvUpdateInfo) {
Map<Table, Set<String>> baseChangedPartitionNames = Maps.newHashMap();
for (Map.Entry<Table, Column> e : refBaseTableAndColumns.entrySet()) {
Table baseTable = e.getKey();
MvBaseTableUpdateInfo mvBaseTableUpdateInfo = getMvBaseTableUpdateInfo(mv, baseTable,
true, true);
mvUpdateInfo.getBaseTableUpdateInfos().put(baseTable, mvBaseTableUpdateInfo);
baseChangedPartitionNames.put(baseTable, mvBaseTableUpdateInfo.getToRefreshPartitionNames());
}
return baseChangedPartitionNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.AnalysisException;
import com.starrocks.sql.common.ListPartitionDiff;
import com.starrocks.sql.common.ListPartitionDiffResult;
import com.starrocks.sql.common.ListPartitionDiffer;
import com.starrocks.sql.common.PListCell;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

public final class MVTimelinessListPartitionArbiter extends MVTimelinessArbiter {
private static final Logger LOG = LogManager.getLogger(MVTimelinessListPartitionArbiter.class);

public MVTimelinessListPartitionArbiter(MaterializedView mv, boolean isQueryRewrite) {
super(mv, isQueryRewrite);
}

@Override
public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisException {
PartitionInfo partitionInfo = mv.getPartitionInfo();
Preconditions.checkState(partitionInfo instanceof ListPartitionInfo);
Map<Table, Column> refBaseTableAndColumns = mv.getRelatedPartitionTableAndColumn();
if (refBaseTableAndColumns.isEmpty()) {
mv.setInactiveAndReason("partition configuration changed");
LOG.warn("mark mv:{} inactive for get partition info failed", mv.getName());
throw new RuntimeException(String.format("getting partition info failed for mv: %s", mv.getName()));
}

// if it needs to refresh based on non-ref base tables, return full refresh directly.
boolean isRefreshBasedOnNonRefTables = needsRefreshOnNonRefBaseTables(refBaseTableAndColumns);
logMVPrepare(mv, "Is refresh based on non-ref base table:{}", isRefreshBasedOnNonRefTables);
if (isRefreshBasedOnNonRefTables) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}

MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL);
ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}

final Map<Table, Map<String, PListCell>> refBaseTablePartitionMap = result.refBaseTablePartitionMap;
// update into mv's to refresh partitions
Set<String> mvToRefreshPartitionNames = mvTimelinessInfo.getMvToRefreshPartitionNames();
final ListPartitionDiff listPartitionDiff = result.listPartitionDiff;
mvToRefreshPartitionNames.addAll(listPartitionDiff.getDeletes().keySet());
// remove ref base table's deleted partitions from `mvPartitionMap`
Map<String, PListCell> mvPartitionNameToListMap = mv.getListPartitionItems();
listPartitionDiff.getDeletes().keySet().forEach(mvPartitionNameToListMap::remove);
// refresh ref base table's new added partitions
mvToRefreshPartitionNames.addAll(listPartitionDiff.getAdds().keySet());
mvPartitionNameToListMap.putAll(listPartitionDiff.getAdds());

final Map<Table, List<Integer>> refBaseTableRefIdxMap = result.refBaseTableRefIdxMap;
Map<Table, Map<String, Set<String>>> baseToMvNameRef = ListPartitionDiffer
.generateBaseRefMap(refBaseTablePartitionMap, refBaseTableRefIdxMap, mvPartitionNameToListMap);
Map<String, Map<Table, Set<String>>> mvToBaseNameRef = ListPartitionDiffer
.generateMvRefMap(mvPartitionNameToListMap, refBaseTableRefIdxMap, refBaseTablePartitionMap);
mvTimelinessInfo.getBasePartToMvPartNames().putAll(baseToMvNameRef);
mvTimelinessInfo.getMvPartToBasePartNames().putAll(mvToBaseNameRef);

// update mv's to refresh partitions based on base table's partition changes
Map<Table, Set<String>> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTableAndColumns,
mvTimelinessInfo);
mvToRefreshPartitionNames.addAll(getMVToRefreshPartitionNames(baseChangedPartitionNames, baseToMvNameRef));

return mvTimelinessInfo;
}

@Override
public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
ListPartitionDiff listPartitionDiff = null;
try {
ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}
listPartitionDiff = result.listPartitionDiff;
} catch (Exception e) {
LOG.warn("Materialized view compute partition difference with base table failed.", e);
return null;
}
if (listPartitionDiff == null) {
LOG.warn("Materialized view compute partition difference with base table failed, the diff of range partition" +
" is null.");
return null;
}
Map<String, PListCell> adds = listPartitionDiff.getAdds();
for (Map.Entry<String, PListCell> addEntry : adds.entrySet()) {
String mvPartitionName = addEntry.getKey();
mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName);
}
return mvUpdateInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.FunctionCallExpr;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvBaseTableUpdateInfo;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
Expand All @@ -42,7 +40,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

/**
Expand Down Expand Up @@ -77,7 +74,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep
}

// There may be a performance issue here, because it will fetch all partitions of base tables and mv partitions.
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv);
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv, null, true);
if (differ == null) {
throw new AnalysisException(String.format("Compute partition difference of mv %s with base table failed.",
mv.getName()));
Expand Down Expand Up @@ -130,32 +127,14 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep
return mvTimelinessInfo;
}

/**
* Collect ref base table's update partition infos
* @param refBaseTableAndColumns ref base table and columns of mv
* @return ref base table's changed partition names
*/
private Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table, Column> refBaseTableAndColumns,
MvUpdateInfo mvUpdateInfo) {
Map<Table, Set<String>> baseChangedPartitionNames = Maps.newHashMap();
for (Map.Entry<Table, Column> e : refBaseTableAndColumns.entrySet()) {
Table baseTable = e.getKey();
MvBaseTableUpdateInfo mvBaseTableUpdateInfo = getMvBaseTableUpdateInfo(mv, baseTable,
true, true);
mvUpdateInfo.getBaseTableUpdateInfos().put(baseTable, mvBaseTableUpdateInfo);
baseChangedPartitionNames.put(baseTable, mvBaseTableUpdateInfo.getToRefreshPartitionNames());
}
return baseChangedPartitionNames;
}

@Override
protected MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
RangePartitionDiff rangePartitionDiff = null;
try {
// There may be a performance issue here, because it will fetch all partitions of base tables and mv partitions.
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv);
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv, null, true);
if (differ == null) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.UNKNOWN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,36 @@ public void setReplay(boolean replay) {
isReplay = replay;
}

private boolean containsKey(String key) {
return taskRunProperties.containsKey(key) && taskRunProperties.get(key) != null;
}

/**
* If the execute option contains the properties that need to be merged into the task run, eg: it's an internal partition
* refresh, needs to merge it into the newer task run.
* task in mv refresh
* @return
*/
public boolean containsToMergeProperties() {
if (taskRunProperties == null) {
return false;
}
if (containsKey(TaskRun.PARTITION_START) || containsKey(TaskRun.PARTITION_END)
|| containsKey(TaskRun.START_TASK_RUN_ID)) {
return true;
}
return false;
}

public void mergeProperties(ExecuteOption option) {
if (option.taskRunProperties != null) {
if (taskRunProperties == null) {
taskRunProperties = Maps.newHashMap();
}
taskRunProperties.putAll(option.taskRunProperties);
}
}

@Override
public String toString() {
return GsonUtils.GSON.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.scheduler.mv.MVPCTRefreshListPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshNonPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshPlanBuilder;
Expand Down Expand Up @@ -964,6 +965,8 @@ private MVPCTRefreshPartitioner buildMvRefreshPartitioner(MaterializedView mv, T
return new MVPCTRefreshNonPartitioner(mvContext, context, db, mv);
} else if (partitionInfo.isRangePartition()) {
return new MVPCTRefreshRangePartitioner(mvContext, context, db, mv);
} else if (partitionInfo.isListPartition()) {
return new MVPCTRefreshListPartitioner(mvContext, context, db, mv);
} else {
throw new DmlException(String.format("materialized view:%s in database:%s refresh failed: partition info %s not " +
"supported", mv.getName(), context.ctx.getDatabase(), partitionInfo));
Expand Down
Loading

0 comments on commit 98d9a98

Please sign in to comment.