Skip to content

Commit

Permalink
activate before refreshing
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Oct 17, 2023
1 parent cb057bb commit 98aeef2
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private void process() {
}
}

private void tryToActivate(MaterializedView mv) {
public static void tryToActivate(MaterializedView mv) {
// if the mv is set to inactive manually, we don't activate it
String reason = mv.getInactiveReason();
if (mv.isActive() || AlterJobMgr.MANUAL_INACTIVE_MV_REASON.equalsIgnoreCase(reason)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,12 @@ private void prepare(TaskRunContext context) {
mvId, context.ctx.getDatabase()));
}
materializedView = (MaterializedView) table;

// try to activate the mv before refresh
if (!materializedView.isActive()) {
MVActiveChecker.tryToActivate(materializedView);
LOG.info("Activated the MV before refreshing: {}", materializedView.getName());
}
if (!materializedView.isActive()) {
String errorMsg = String.format("Materialized view: %s, id: %d is not active, " +
"skip sync partition and data with base tables", materializedView.getName(), mvId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.starrocks.sql.ast.AsyncRefreshSchemeDesc;
import com.starrocks.sql.ast.RefreshSchemeClause;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.plan.PlanTestBase;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
Expand Down Expand Up @@ -234,6 +235,7 @@ public void testAlterMVOnView() throws Exception {

@Test
public void testActiveChecker() throws Exception {
PlanTestBase.mockDml();
MVActiveChecker checker = GlobalStateMgr.getCurrentState().getMvActiveChecker();
checker.setStop();

Expand All @@ -242,7 +244,7 @@ public void testActiveChecker() throws Exception {
"create table " + baseTableName + " ( k1 int, k2 int) properties('replication_num'='1')";
starRocksAssert.withTable(createTableSql);
starRocksAssert.withMaterializedView("create materialized view mv_active " +
" refresh async as select * from base_tbl_active");
" refresh manual as select * from base_tbl_active");
MaterializedView mv = (MaterializedView) starRocksAssert.getTable(connectContext.getDatabase(), "mv_active");
Assert.assertTrue(mv.isActive());

Expand All @@ -260,6 +262,15 @@ public void testActiveChecker() throws Exception {
checker.runForTest();
Assert.assertTrue(mv.isActive());

// activate before refresh
connectContext.setThreadLocalInfo();
starRocksAssert.dropTable(baseTableName);
starRocksAssert.withTable(createTableSql);
Assert.assertFalse(mv.isActive());
Thread.sleep(1000);
starRocksAssert.getCtx().executeSql("refresh materialized view " + mv.getName() + " with sync mode");
Assert.assertTrue(mv.isActive());

// manually set to inactive
mv.setInactiveAndReason(AlterJobMgr.MANUAL_INACTIVE_MV_REASON);
Assert.assertFalse(mv.isActive());
Expand Down
48 changes: 48 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/PlanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,26 @@

package com.starrocks.sql.plan;

import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.FeConstants;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.InsertStmt;
import mockit.Mock;
import mockit.MockUp;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.List;

public class PlanTestBase extends PlanTestNoneDBBase {
// use a unique dir so that it won't be conflict with other unit test which
@BeforeClass
Expand Down Expand Up @@ -1030,4 +1045,37 @@ public static void afterClass() {
connectContext.getSessionVariable().setEnableLowCardinalityOptimize(true);
connectContext.getSessionVariable().setEnableLocalShuffleAgg(true);
}

private static void setPartitionVersion(Partition partition, long version) {
partition.setVisibleVersion(version, System.currentTimeMillis());
MaterializedIndex baseIndex = partition.getBaseIndex();
List<Tablet> tablets = baseIndex.getTablets();
for (Tablet tablet : tablets) {
List<Replica> replicas = ((LocalTablet) tablet).getImmutableReplicas();
for (Replica replica : replicas) {
replica.updateVersionInfo(version, -1, version);
}
}
}

public static void mockDml() {
new MockUp<StmtExecutor>() {
@Mock
public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
if (stmt instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) stmt;
TableName tableName = insertStmt.getTableName();
Database testDb = GlobalStateMgr.getCurrentState().getDb("test");
OlapTable tbl = ((OlapTable) testDb.getTable(tableName.getTbl()));
if (tbl != null) {
for (Partition partition : tbl.getPartitions()) {
if (insertStmt.getTargetPartitionIds().contains(partition.getId())) {
setPartitionVersion(partition, partition.getVisibleVersion() + 1);
}
}
}
}
}
};
}
}

0 comments on commit 98aeef2

Please sign in to comment.