Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] List Partition For AMV(Part 2): Support list partition for asynchronous materialized view with non-nullable partition columns #46680

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public void addListPartitionKeys(String partitionName,
nameToPartKeys.put(partitionName, listPartitionKey);
}

public void addListPartitionKeys(Map<String, PListCell> listPartitionKeys) {
nameToPartKeys.putAll(listPartitionKeys);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Range;
import com.starrocks.analysis.Expr;
import com.starrocks.catalog.mv.MVTimelinessArbiter;
import com.starrocks.catalog.mv.MVTimelinessListPartitionArbiter;
import com.starrocks.catalog.mv.MVTimelinessNonPartitionArbiter;
import com.starrocks.catalog.mv.MVTimelinessRangePartitionArbiter;
import com.starrocks.common.AnalysisException;
Expand Down Expand Up @@ -104,6 +105,8 @@ public static MVTimelinessArbiter buildMVTimelinessArbiter(MaterializedView mv,
return new MVTimelinessNonPartitionArbiter(mv, isQueryRewrite);
} else if (partitionInfo.isRangePartition()) {
return new MVTimelinessRangePartitionArbiter(mv, isQueryRewrite);
} else if (partitionInfo.isListPartition()) {
return new MVTimelinessListPartitionArbiter(mv, isQueryRewrite);
} else {
throw UnsupportedException.unsupportedException("unsupported partition info type:" +
partitionInfo.getClass().getName());
Expand All @@ -114,10 +117,10 @@ public static MVTimelinessArbiter buildMVTimelinessArbiter(MaterializedView mv,
* Check whether mv needs to refresh based on the ref base table. It's a shortcut version of getMvBaseTableUpdateInfo.
* @return Optional<Boolean> : true if needs to refresh, false if not, empty if there are some unkown results.
*/
public static Optional<Boolean> needsToRefreshTable(MaterializedView mv,
Table baseTable,
boolean withMv,
boolean isQueryRewrite) {
private static Optional<Boolean> needsToRefreshTable(MaterializedView mv,
Table baseTable,
boolean withMv,
boolean isQueryRewrite) {
if (baseTable.isView()) {
// do nothing
return Optional.of(false);
Expand Down Expand Up @@ -195,9 +198,7 @@ public static MvBaseTableUpdateInfo getMvBaseTableUpdateInfo(MaterializedView mv
if (mvPartitionInfo.isListPartition()) {
Map<String, PListCell> partitionNameWithRange = getMVPartitionNameWithList(baseTable,
partitionColumn, updatedPartitionNamesList);
for (Map.Entry<String, PListCell> e : partitionNameWithRange.entrySet()) {
baseTableUpdateInfo.addListPartitionKeys(e.getKey(), e.getValue());
}
baseTableUpdateInfo.addListPartitionKeys(partitionNameWithRange);
baseTableUpdateInfo.addToRefreshPartitionNames(partitionNameWithRange.keySet());
} else if (mvPartitionInfo.isRangePartition()) {
Expr partitionExpr = MaterializedView.getPartitionExpr(mv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package com.starrocks.catalog.mv;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvBaseTableUpdateInfo;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
Expand All @@ -29,6 +31,7 @@
import java.util.Map;
import java.util.Set;

import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.catalog.MvRefreshArbiter.needsToRefreshTable;

/**
Expand Down Expand Up @@ -147,4 +150,22 @@ protected Set<String> getMVToRefreshPartitionNames(
}
return needRefreshMvPartitionNames;
}

/**
* Collect ref base table's update partition infos
* @param refBaseTableAndColumns ref base table and columns of mv
* @return ref base table's changed partition names
*/
protected Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table, Column> refBaseTableAndColumns,
MvUpdateInfo mvUpdateInfo) {
Map<Table, Set<String>> baseChangedPartitionNames = Maps.newHashMap();
for (Map.Entry<Table, Column> e : refBaseTableAndColumns.entrySet()) {
Table baseTable = e.getKey();
MvBaseTableUpdateInfo mvBaseTableUpdateInfo = getMvBaseTableUpdateInfo(mv, baseTable,
true, true);
mvUpdateInfo.getBaseTableUpdateInfos().put(baseTable, mvBaseTableUpdateInfo);
baseChangedPartitionNames.put(baseTable, mvBaseTableUpdateInfo.getToRefreshPartitionNames());
}
return baseChangedPartitionNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.AnalysisException;
import com.starrocks.sql.common.ListPartitionDiff;
import com.starrocks.sql.common.ListPartitionDiffResult;
import com.starrocks.sql.common.ListPartitionDiffer;
import com.starrocks.sql.common.PListCell;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

public final class MVTimelinessListPartitionArbiter extends MVTimelinessArbiter {
private static final Logger LOG = LogManager.getLogger(MVTimelinessListPartitionArbiter.class);

public MVTimelinessListPartitionArbiter(MaterializedView mv, boolean isQueryRewrite) {
super(mv, isQueryRewrite);
}

@Override
public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisException {
PartitionInfo partitionInfo = mv.getPartitionInfo();
Preconditions.checkState(partitionInfo instanceof ListPartitionInfo);
Map<Table, Column> refBaseTableAndColumns = mv.getRelatedPartitionTableAndColumn();
if (refBaseTableAndColumns.isEmpty()) {
mv.setInactiveAndReason("partition configuration changed");
LOG.warn("mark mv:{} inactive for get partition info failed", mv.getName());
throw new RuntimeException(String.format("getting partition info failed for mv: %s", mv.getName()));
}

// if it needs to refresh based on non-ref base tables, return full refresh directly.
boolean isRefreshBasedOnNonRefTables = needsRefreshOnNonRefBaseTables(refBaseTableAndColumns);
logMVPrepare(mv, "Is refresh based on non-ref base table:{}", isRefreshBasedOnNonRefTables);
if (isRefreshBasedOnNonRefTables) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}

MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL);
ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}

final Map<Table, Map<String, PListCell>> refBaseTablePartitionMap = result.refBaseTablePartitionMap;
// update into mv's to refresh partitions
Set<String> mvToRefreshPartitionNames = mvTimelinessInfo.getMvToRefreshPartitionNames();
final ListPartitionDiff listPartitionDiff = result.listPartitionDiff;
mvToRefreshPartitionNames.addAll(listPartitionDiff.getDeletes().keySet());
// remove ref base table's deleted partitions from `mvPartitionMap`
Map<String, PListCell> mvPartitionNameToListMap = mv.getListPartitionItems();
listPartitionDiff.getDeletes().keySet().forEach(mvPartitionNameToListMap::remove);
// refresh ref base table's new added partitions
mvToRefreshPartitionNames.addAll(listPartitionDiff.getAdds().keySet());
mvPartitionNameToListMap.putAll(listPartitionDiff.getAdds());

final Map<Table, List<Integer>> refBaseTableRefIdxMap = result.refBaseTableRefIdxMap;
Map<Table, Map<String, Set<String>>> baseToMvNameRef = ListPartitionDiffer
.generateBaseRefMap(refBaseTablePartitionMap, refBaseTableRefIdxMap, mvPartitionNameToListMap);
Map<String, Map<Table, Set<String>>> mvToBaseNameRef = ListPartitionDiffer
.generateMvRefMap(mvPartitionNameToListMap, refBaseTableRefIdxMap, refBaseTablePartitionMap);
mvTimelinessInfo.getBasePartToMvPartNames().putAll(baseToMvNameRef);
mvTimelinessInfo.getMvPartToBasePartNames().putAll(mvToBaseNameRef);

// update mv's to refresh partitions based on base table's partition changes
Map<Table, Set<String>> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTableAndColumns,
mvTimelinessInfo);
mvToRefreshPartitionNames.addAll(getMVToRefreshPartitionNames(baseChangedPartitionNames, baseToMvNameRef));

return mvTimelinessInfo;
}

@Override
public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
ListPartitionDiff listPartitionDiff = null;
try {
ListPartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}
listPartitionDiff = result.listPartitionDiff;
} catch (Exception e) {
LOG.warn("Materialized view compute partition difference with base table failed.", e);
return null;
}
if (listPartitionDiff == null) {
LOG.warn("Materialized view compute partition difference with base table failed, the diff of range partition" +
" is null.");
return null;
}
Map<String, PListCell> adds = listPartitionDiff.getAdds();
for (Map.Entry<String, PListCell> addEntry : adds.entrySet()) {
String mvPartitionName = addEntry.getKey();
mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName);
}
return mvUpdateInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.FunctionCallExpr;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvBaseTableUpdateInfo;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
Expand All @@ -42,7 +40,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

/**
Expand Down Expand Up @@ -77,7 +74,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep
}

// There may be a performance issue here, because it will fetch all partitions of base tables and mv partitions.
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv);
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv, null, true);
if (differ == null) {
throw new AnalysisException(String.format("Compute partition difference of mv %s with base table failed.",
mv.getName()));
Expand Down Expand Up @@ -130,32 +127,14 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep
return mvTimelinessInfo;
}

/**
* Collect ref base table's update partition infos
* @param refBaseTableAndColumns ref base table and columns of mv
* @return ref base table's changed partition names
*/
private Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table, Column> refBaseTableAndColumns,
MvUpdateInfo mvUpdateInfo) {
Map<Table, Set<String>> baseChangedPartitionNames = Maps.newHashMap();
for (Map.Entry<Table, Column> e : refBaseTableAndColumns.entrySet()) {
Table baseTable = e.getKey();
MvBaseTableUpdateInfo mvBaseTableUpdateInfo = getMvBaseTableUpdateInfo(mv, baseTable,
true, true);
mvUpdateInfo.getBaseTableUpdateInfos().put(baseTable, mvBaseTableUpdateInfo);
baseChangedPartitionNames.put(baseTable, mvBaseTableUpdateInfo.getToRefreshPartitionNames());
}
return baseChangedPartitionNames;
}

@Override
protected MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
RangePartitionDiff rangePartitionDiff = null;
try {
// There may be a performance issue here, because it will fetch all partitions of base tables and mv partitions.
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv);
RangePartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv, null, true);
if (differ == null) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.UNKNOWN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,36 @@ public void setReplay(boolean replay) {
isReplay = replay;
}

private boolean containsKey(String key) {
return taskRunProperties.containsKey(key) && taskRunProperties.get(key) != null;
}

/**
* If the execute option contains the properties that need to be merged into the task run, eg: it's an internal partition
* refresh, needs to merge it into the newer task run.
* task in mv refresh
* @return
*/
public boolean containsToMergeProperties() {
if (taskRunProperties == null) {
return false;
}
if (containsKey(TaskRun.PARTITION_START) || containsKey(TaskRun.PARTITION_END)
|| containsKey(TaskRun.START_TASK_RUN_ID)) {
return true;
}
return false;
}

public void mergeProperties(ExecuteOption option) {
if (option.taskRunProperties != null) {
if (taskRunProperties == null) {
taskRunProperties = Maps.newHashMap();
}
taskRunProperties.putAll(option.taskRunProperties);
}
}

@Override
public String toString() {
return GsonUtils.GSON.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.scheduler.mv.MVPCTMetaRepairer;
import com.starrocks.scheduler.mv.MVPCTRefreshListPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshNonPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshPlanBuilder;
Expand Down Expand Up @@ -997,6 +998,8 @@ private MVPCTRefreshPartitioner buildMvRefreshPartitioner(MaterializedView mv, T
return new MVPCTRefreshNonPartitioner(mvContext, context, db, mv);
} else if (partitionInfo.isRangePartition()) {
return new MVPCTRefreshRangePartitioner(mvContext, context, db, mv);
} else if (partitionInfo.isListPartition()) {
return new MVPCTRefreshListPartitioner(mvContext, context, db, mv);
} else {
throw new DmlException(String.format("materialized view:%s in database:%s refresh failed: partition info %s not " +
"supported", mv.getName(), context.ctx.getDatabase(), partitionInfo));
Expand Down
Loading
Loading