Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] active mv automatically (backport #32829) #32981

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@

public class AlterJobMgr {
private static final Logger LOG = LogManager.getLogger(AlterJobMgr.class);
public static final String MANUAL_INACTIVE_MV_REASON = "user use alter materialized view set status to inactive";

private final SchemaChangeHandler schemaChangeHandler;
private final MaterializedViewHandler materializedViewHandler;
Expand Down Expand Up @@ -300,7 +301,7 @@ public void alterMaterializedViewStatus(MaterializedView materializedView, Strin
materializedView, baseTableInfos);
materializedView.setActive();
} else if (AlterMaterializedViewStatusClause.INACTIVE.equalsIgnoreCase(status)) {
materializedView.setInactiveAndReason("user use alter materialized view set status to inactive");
materializedView.setInactiveAndReason(MANUAL_INACTIVE_MV_REASON);
}
}

Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = false)
public static int pipe_scheduler_interval_millis = 1000;

@ConfField(mutable = true)
public static long mv_active_checker_interval_seconds = 60;

/**
* To prevent the external catalog from displaying too many entries in the grantsTo system table,
* you can use this variable to ignore the entries in the external catalog
Expand Down
111 changes: 111 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/MVActiveChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.starrocks.alter.AlterJobMgr;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.Optional;

/**
* A daemon thread that check the MV active status, try to activate the MV it's inactive.
*/
public class MVActiveChecker extends FrontendDaemon {

private static final Logger LOG = LogManager.getLogger(MVActiveChecker.class);

public MVActiveChecker() {
super("MVActiveChecker", Config.mv_active_checker_interval_seconds * 1000);
}

@Override
protected void runAfterCatalogReady() {
// reset if the interval has been changed
setInterval(Config.mv_active_checker_interval_seconds * 1000L);

try {
process();
} catch (Throwable e) {
LOG.warn("Failed to process one round of MVActiveChecker", e);
}
}

@VisibleForTesting
public void runForTest() {
process();
}

private void process() {
Collection<Database> dbs = GlobalStateMgr.getCurrentState().getIdToDb().values();
for (Database db : CollectionUtils.emptyIfNull(dbs)) {
for (Table table : CollectionUtils.emptyIfNull(db.getTables())) {
if (table.isMaterializedView()) {
MaterializedView mv = (MaterializedView) table;
if (!mv.isActive()) {
tryToActivate(mv);
}
}
}
}
}

public static void tryToActivate(MaterializedView mv) {
// if the mv is set to inactive manually, we don't activate it
String reason = mv.getInactiveReason();
if (mv.isActive() || AlterJobMgr.MANUAL_INACTIVE_MV_REASON.equalsIgnoreCase(reason)) {
return;
}

long dbId = mv.getDbId();
Optional<String> dbName = GlobalStateMgr.getCurrentState().mayGetDb(dbId).map(Database::getFullName);
if (!dbName.isPresent()) {
LOG.warn("[MVActiveChecker] cannot activate MV {} since database {} not found", mv.getName(), dbId);
return;
}

String mvFullName = new TableName(dbName.get(), mv.getName()).toString();
String sql = String.format("ALTER MATERIALIZED VIEW %s active", mvFullName);
try {
ConnectContext connect = StatisticUtils.buildConnectContext();
connect.setStatisticsContext(false);
connect.setDatabase(dbName.get());

connect.executeSql(sql);
if (mv.isActive()) {
LOG.info("[MVActiveChecker] activate MV {} successfully", mvFullName);
} else {
LOG.warn("[MVActiveChecker] activate MV {} failed", mvFullName);
}
} catch (Exception e) {
LOG.warn("[MVActiveChecker] activate MV {} failed", mvFullName, e);
throw new RuntimeException(e);
} finally {
ConnectContext.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,12 @@ private void prepare(TaskRunContext context) {
mvId, context.ctx.getDatabase()));
}
materializedView = (MaterializedView) table;

// try to activate the mv before refresh
if (!materializedView.isActive()) {
MVActiveChecker.tryToActivate(materializedView);
LOG.info("Activated the MV before refreshing: {}", materializedView.getName());
}
if (!materializedView.isActive()) {
String errorMsg = String.format("Materialized view: %s, id: %d is not active, " +
"skip sync partition and data with base tables", materializedView.getName(), mvId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
import com.starrocks.qe.scheduler.slot.SlotManager;
import com.starrocks.qe.scheduler.slot.SlotProvider;
import com.starrocks.rpc.FrontendServiceProxy;
import com.starrocks.scheduler.MVActiveChecker;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.mv.MVJobExecutor;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
Expand Down Expand Up @@ -542,6 +543,7 @@ public class GlobalStateMgr {
private PipeManager pipeManager;
private PipeListener pipeListener;
private PipeScheduler pipeScheduler;
private MVActiveChecker mvActiveChecker;

private final ResourceUsageMonitor resourceUsageMonitor = new ResourceUsageMonitor();
private final SlotManager slotManager = new SlotManager(resourceUsageMonitor);
Expand Down Expand Up @@ -763,6 +765,7 @@ private GlobalStateMgr(boolean isCkptGlobalState) {
this.pipeManager = new PipeManager();
this.pipeListener = new PipeListener(this.pipeManager);
this.pipeScheduler = new PipeScheduler(this.pipeManager);
this.mvActiveChecker = new MVActiveChecker();

if (RunMode.getCurrentRunMode().isAllowCreateLakeTable()) {
this.storageVolumeMgr = new SharedDataStorageVolumeMgr();
Expand Down Expand Up @@ -1020,6 +1023,10 @@ public PipeListener getPipeListener() {
return pipeListener;
}

public MVActiveChecker getMvActiveChecker() {
return mvActiveChecker;
}

public ConnectorTblMetaInfoMgr getConnectorTblMetaInfoMgr() {
return connectorTblMetaInfoMgr;
}
Expand Down Expand Up @@ -1386,6 +1393,7 @@ private void startLeaderOnlyDaemonThreads() {
mvMVJobExecutor.start();
pipeListener.start();
pipeScheduler.start();
mvActiveChecker.start();

// start daemon thread to report the progress of RunningTaskRun to the follower by editlog
taskRunStateSynchronizer = new TaskRunStateSynchronizer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
package com.starrocks.analysis;

import com.google.common.collect.ImmutableList;
import com.starrocks.alter.AlterJobMgr;
import com.starrocks.alter.AlterMVJobExecutor;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.common.AnalysisException;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.MVActiveChecker;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AlterMaterializedViewStmt;
import com.starrocks.sql.ast.AsyncRefreshSchemeDesc;
import com.starrocks.sql.ast.RefreshSchemeClause;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.plan.PlanTestBase;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -61,6 +65,11 @@ public static void beforeClass() throws Exception {
" from t0 group by v1;\n");
}

@Before
public void before() {
connectContext.setThreadLocalInfo();
}

@Test
public void testRename() throws Exception {
String alterMvSql = "alter materialized view mv1 rename mv2;";
Expand Down Expand Up @@ -223,4 +232,52 @@ public void testAlterMVOnView() throws Exception {
Assert.assertTrue(mv.isActive());
Assert.assertNull(mv.getInactiveReason());
}

@Test
public void testActiveChecker() throws Exception {
PlanTestBase.mockDml();
MVActiveChecker checker = GlobalStateMgr.getCurrentState().getMvActiveChecker();
checker.setStop();

String baseTableName = "base_tbl_active";
String createTableSql =
"create table " + baseTableName + " ( k1 int, k2 int) properties('replication_num'='1')";
starRocksAssert.withTable(createTableSql);
starRocksAssert.withMaterializedView("create materialized view mv_active " +
" refresh manual as select * from base_tbl_active");
MaterializedView mv = (MaterializedView) starRocksAssert.getTable(connectContext.getDatabase(), "mv_active");
Assert.assertTrue(mv.isActive());

// drop the base table and try to activate it
starRocksAssert.dropTable(baseTableName);
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());
checker.runForTest();
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());

// create the table again, and activate it
connectContext.setThreadLocalInfo();
starRocksAssert.withTable(createTableSql);
checker.runForTest();
Assert.assertTrue(mv.isActive());

// activate before refresh
connectContext.setThreadLocalInfo();
starRocksAssert.dropTable(baseTableName);
starRocksAssert.withTable(createTableSql);
Assert.assertFalse(mv.isActive());
Thread.sleep(1000);
starRocksAssert.getCtx().executeSql("refresh materialized view " + mv.getName() + " with sync mode");
Assert.assertTrue(mv.isActive());

// manually set to inactive
mv.setInactiveAndReason(AlterJobMgr.MANUAL_INACTIVE_MV_REASON);
Assert.assertFalse(mv.isActive());
checker.runForTest();
Assert.assertFalse(mv.isActive());
Assert.assertEquals(AlterJobMgr.MANUAL_INACTIVE_MV_REASON, mv.getInactiveReason());

checker.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,23 +457,6 @@ public void testWithPartition() {
}
}

@Test
public void testInactive() {
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
MaterializedView materializedView = ((MaterializedView) testDb.getTable("mv_inactive"));
materializedView.setInactiveAndReason("");
Task task = TaskBuilder.buildMvTask(materializedView, testDb.getFullName());

TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
taskRun.initStatus(UUIDUtil.genUUID().toString(), System.currentTimeMillis());
try {
taskRun.executeTaskRun();
Assert.fail("should not be here. executeTaskRun will throw exception");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("is not active, skip sync partition and data with base tables"));
}
}

@Test
public void testMvWithoutPartition() {
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
Expand Down
48 changes: 48 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,26 @@

package com.starrocks.sql.plan;

import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.FeConstants;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.InsertStmt;
import mockit.Mock;
import mockit.MockUp;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.List;

public class PlanTestBase extends PlanTestNoneDBBase {
// use a unique dir so that it won't be conflict with other unit test which
@BeforeClass
Expand Down Expand Up @@ -1030,4 +1045,37 @@ public static void afterClass() {
connectContext.getSessionVariable().setEnableLowCardinalityOptimize(true);
connectContext.getSessionVariable().setEnableLocalShuffleAgg(true);
}

private static void setPartitionVersion(Partition partition, long version) {
partition.setVisibleVersion(version, System.currentTimeMillis());
MaterializedIndex baseIndex = partition.getBaseIndex();
List<Tablet> tablets = baseIndex.getTablets();
for (Tablet tablet : tablets) {
List<Replica> replicas = ((LocalTablet) tablet).getImmutableReplicas();
for (Replica replica : replicas) {
replica.updateVersionInfo(version, -1, version);
}
}
}

public static void mockDml() {
new MockUp<StmtExecutor>() {
@Mock
public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
if (stmt instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) stmt;
TableName tableName = insertStmt.getTableName();
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
OlapTable tbl = ((OlapTable) testDb.getTable(tableName.getTbl()));
if (tbl != null) {
for (Partition partition : tbl.getPartitions()) {
if (insertStmt.getTargetPartitionIds().contains(partition.getId())) {
setPartitionVersion(partition, partition.getVisibleVersion() + 1);
}
}
}
}
}
};
}
}
Loading