Skip to content

Commit

Permalink
[Enhancement] active mv automatically (#32829)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
(cherry picked from commit c1e3a1e)
  • Loading branch information
murphyatwork authored and wanpengfei-git committed Oct 17, 2023
1 parent 22d29fb commit 6c56ec2
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@

public class AlterJobMgr {
private static final Logger LOG = LogManager.getLogger(AlterJobMgr.class);
public static final String MANUAL_INACTIVE_MV_REASON = "user use alter materialized view set status to inactive";

private final SchemaChangeHandler schemaChangeHandler;
private final MaterializedViewHandler materializedViewHandler;
Expand Down Expand Up @@ -300,7 +301,7 @@ public void alterMaterializedViewStatus(MaterializedView materializedView, Strin
materializedView, baseTableInfos);
materializedView.setActive();
} else if (AlterMaterializedViewStatusClause.INACTIVE.equalsIgnoreCase(status)) {
materializedView.setInactiveAndReason("user use alter materialized view set status to inactive");
materializedView.setInactiveAndReason(MANUAL_INACTIVE_MV_REASON);
}
}

Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = false)
public static int pipe_scheduler_interval_millis = 1000;

@ConfField(mutable = true)
public static long mv_active_checker_interval_seconds = 60;

/**
* To prevent the external catalog from displaying too many entries in the grantsTo system table,
* you can use this variable to ignore the entries in the external catalog
Expand Down
111 changes: 111 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/MVActiveChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.starrocks.alter.AlterJobMgr;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.Optional;

/**
* A daemon thread that check the MV active status, try to activate the MV it's inactive.
*/
public class MVActiveChecker extends FrontendDaemon {

private static final Logger LOG = LogManager.getLogger(MVActiveChecker.class);

public MVActiveChecker() {
super("MVActiveChecker", Config.mv_active_checker_interval_seconds * 1000);
}

@Override
protected void runAfterCatalogReady() {
// reset if the interval has been changed
setInterval(Config.mv_active_checker_interval_seconds * 1000L);

try {
process();
} catch (Throwable e) {
LOG.warn("Failed to process one round of MVActiveChecker", e);
}
}

@VisibleForTesting
public void runForTest() {
process();
}

private void process() {
Collection<Database> dbs = GlobalStateMgr.getCurrentState().getIdToDb().values();
for (Database db : CollectionUtils.emptyIfNull(dbs)) {
for (Table table : CollectionUtils.emptyIfNull(db.getTables())) {
if (table.isMaterializedView()) {
MaterializedView mv = (MaterializedView) table;
if (!mv.isActive()) {
tryToActivate(mv);
}
}
}
}
}

public static void tryToActivate(MaterializedView mv) {
// if the mv is set to inactive manually, we don't activate it
String reason = mv.getInactiveReason();
if (mv.isActive() || AlterJobMgr.MANUAL_INACTIVE_MV_REASON.equalsIgnoreCase(reason)) {
return;
}

long dbId = mv.getDbId();
Optional<String> dbName = GlobalStateMgr.getCurrentState().mayGetDb(dbId).map(Database::getFullName);
if (!dbName.isPresent()) {
LOG.warn("[MVActiveChecker] cannot activate MV {} since database {} not found", mv.getName(), dbId);
return;
}

String mvFullName = new TableName(dbName.get(), mv.getName()).toString();
String sql = String.format("ALTER MATERIALIZED VIEW %s active", mvFullName);
try {
ConnectContext connect = StatisticUtils.buildConnectContext();
connect.setStatisticsContext(false);
connect.setDatabase(dbName.get());

connect.executeSql(sql);
if (mv.isActive()) {
LOG.info("[MVActiveChecker] activate MV {} successfully", mvFullName);
} else {
LOG.warn("[MVActiveChecker] activate MV {} failed", mvFullName);
}
} catch (Exception e) {
LOG.warn("[MVActiveChecker] activate MV {} failed", mvFullName, e);
throw new RuntimeException(e);
} finally {
ConnectContext.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,12 @@ private void prepare(TaskRunContext context) {
mvId, context.ctx.getDatabase()));
}
materializedView = (MaterializedView) table;

// try to activate the mv before refresh
if (!materializedView.isActive()) {
MVActiveChecker.tryToActivate(materializedView);
LOG.info("Activated the MV before refreshing: {}", materializedView.getName());
}
if (!materializedView.isActive()) {
String errorMsg = String.format("Materialized view: %s, id: %d is not active, " +
"skip sync partition and data with base tables", materializedView.getName(), mvId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
import com.starrocks.qe.scheduler.slot.SlotManager;
import com.starrocks.qe.scheduler.slot.SlotProvider;
import com.starrocks.rpc.FrontendServiceProxy;
import com.starrocks.scheduler.MVActiveChecker;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.mv.MVJobExecutor;
import com.starrocks.scheduler.mv.MaterializedViewMgr;
Expand Down Expand Up @@ -542,6 +543,7 @@ public class GlobalStateMgr {
private PipeManager pipeManager;
private PipeListener pipeListener;
private PipeScheduler pipeScheduler;
private MVActiveChecker mvActiveChecker;

private final ResourceUsageMonitor resourceUsageMonitor = new ResourceUsageMonitor();
private final SlotManager slotManager = new SlotManager(resourceUsageMonitor);
Expand Down Expand Up @@ -763,6 +765,7 @@ private GlobalStateMgr(boolean isCkptGlobalState) {
this.pipeManager = new PipeManager();
this.pipeListener = new PipeListener(this.pipeManager);
this.pipeScheduler = new PipeScheduler(this.pipeManager);
this.mvActiveChecker = new MVActiveChecker();

if (RunMode.getCurrentRunMode().isAllowCreateLakeTable()) {
this.storageVolumeMgr = new SharedDataStorageVolumeMgr();
Expand Down Expand Up @@ -1020,6 +1023,10 @@ public PipeListener getPipeListener() {
return pipeListener;
}

public MVActiveChecker getMvActiveChecker() {
return mvActiveChecker;
}

public ConnectorTblMetaInfoMgr getConnectorTblMetaInfoMgr() {
return connectorTblMetaInfoMgr;
}
Expand Down Expand Up @@ -1386,6 +1393,7 @@ private void startLeaderOnlyDaemonThreads() {
mvMVJobExecutor.start();
pipeListener.start();
pipeScheduler.start();
mvActiveChecker.start();

// start daemon thread to report the progress of RunningTaskRun to the follower by editlog
taskRunStateSynchronizer = new TaskRunStateSynchronizer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
package com.starrocks.analysis;

import com.google.common.collect.ImmutableList;
import com.starrocks.alter.AlterJobMgr;
import com.starrocks.alter.AlterMVJobExecutor;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.common.AnalysisException;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.MVActiveChecker;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AlterMaterializedViewStmt;
import com.starrocks.sql.ast.AsyncRefreshSchemeDesc;
import com.starrocks.sql.ast.RefreshSchemeClause;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.plan.PlanTestBase;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -61,6 +65,11 @@ public static void beforeClass() throws Exception {
" from t0 group by v1;\n");
}

@Before
public void before() {
connectContext.setThreadLocalInfo();
}

@Test
public void testRename() throws Exception {
String alterMvSql = "alter materialized view mv1 rename mv2;";
Expand Down Expand Up @@ -223,4 +232,52 @@ public void testAlterMVOnView() throws Exception {
Assert.assertTrue(mv.isActive());
Assert.assertNull(mv.getInactiveReason());
}

@Test
public void testActiveChecker() throws Exception {
PlanTestBase.mockDml();
MVActiveChecker checker = GlobalStateMgr.getCurrentState().getMvActiveChecker();
checker.setStop();

String baseTableName = "base_tbl_active";
String createTableSql =
"create table " + baseTableName + " ( k1 int, k2 int) properties('replication_num'='1')";
starRocksAssert.withTable(createTableSql);
starRocksAssert.withMaterializedView("create materialized view mv_active " +
" refresh manual as select * from base_tbl_active");
MaterializedView mv = (MaterializedView) starRocksAssert.getTable(connectContext.getDatabase(), "mv_active");
Assert.assertTrue(mv.isActive());

// drop the base table and try to activate it
starRocksAssert.dropTable(baseTableName);
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());
checker.runForTest();
Assert.assertFalse(mv.isActive());
Assert.assertEquals("base-table dropped: base_tbl_active", mv.getInactiveReason());

// create the table again, and activate it
connectContext.setThreadLocalInfo();
starRocksAssert.withTable(createTableSql);
checker.runForTest();
Assert.assertTrue(mv.isActive());

// activate before refresh
connectContext.setThreadLocalInfo();
starRocksAssert.dropTable(baseTableName);
starRocksAssert.withTable(createTableSql);
Assert.assertFalse(mv.isActive());
Thread.sleep(1000);
starRocksAssert.getCtx().executeSql("refresh materialized view " + mv.getName() + " with sync mode");
Assert.assertTrue(mv.isActive());

// manually set to inactive
mv.setInactiveAndReason(AlterJobMgr.MANUAL_INACTIVE_MV_REASON);
Assert.assertFalse(mv.isActive());
checker.runForTest();
Assert.assertFalse(mv.isActive());
Assert.assertEquals(AlterJobMgr.MANUAL_INACTIVE_MV_REASON, mv.getInactiveReason());

checker.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,23 +459,6 @@ public void testWithPartition() {
}
}

@Test
public void testInactive() {
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
MaterializedView materializedView = ((MaterializedView) testDb.getTable("mv_inactive"));
materializedView.setInactiveAndReason("");
Task task = TaskBuilder.buildMvTask(materializedView, testDb.getFullName());

TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
taskRun.initStatus(UUIDUtil.genUUID().toString(), System.currentTimeMillis());
try {
taskRun.executeTaskRun();
Assert.fail("should not be here. executeTaskRun will throw exception");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("is not active, skip sync partition and data with base tables"));
}
}

@Test
public void testMvWithoutPartition() {
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
Expand Down
48 changes: 48 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,26 @@

package com.starrocks.sql.plan;

import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.FeConstants;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.InsertStmt;
import mockit.Mock;
import mockit.MockUp;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.List;

public class PlanTestBase extends PlanTestNoneDBBase {
// use a unique dir so that it won't be conflict with other unit test which
@BeforeClass
Expand Down Expand Up @@ -1030,4 +1045,37 @@ public static void afterClass() {
connectContext.getSessionVariable().setEnableLowCardinalityOptimize(true);
connectContext.getSessionVariable().setEnableLocalShuffleAgg(true);
}

private static void setPartitionVersion(Partition partition, long version) {
partition.setVisibleVersion(version, System.currentTimeMillis());
MaterializedIndex baseIndex = partition.getBaseIndex();
List<Tablet> tablets = baseIndex.getTablets();
for (Tablet tablet : tablets) {
List<Replica> replicas = ((LocalTablet) tablet).getImmutableReplicas();
for (Replica replica : replicas) {
replica.updateVersionInfo(version, -1, version);
}
}
}

public static void mockDml() {
new MockUp<StmtExecutor>() {
@Mock
public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
if (stmt instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) stmt;
TableName tableName = insertStmt.getTableName();
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
OlapTable tbl = ((OlapTable) testDb.getTable(tableName.getTbl()));
if (tbl != null) {
for (Partition partition : tbl.getPartitions()) {
if (insertStmt.getTargetPartitionIds().contains(partition.getId())) {
setPartitionVersion(partition, partition.getVisibleVersion() + 1);
}
}
}
}
}
};
}
}

0 comments on commit 6c56ec2

Please sign in to comment.