Skip to content

Commit

Permalink
[Enhancement] auto change replication_num of system tables (backport #…
Browse files Browse the repository at this point in the history
…51799) (#51886)

Signed-off-by: Murphy <[email protected]>
Co-authored-by: Murphy <[email protected]>
Co-authored-by: Murphy <[email protected]>
  • Loading branch information
3 people authored Nov 9, 2024
1 parent 69331b2 commit d0d01da
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.load.pipe.PipeFileRecord;
import com.starrocks.statistic.StatsConstants;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class FileListTableRepo extends FileListRepo {
"properties('replication_num' = '%d') ";

protected static final String CORRECT_FILE_LIST_REPLICATION_NUM =
"ALTER TABLE %s SET ('replication_num'='3')";
"ALTER TABLE %s SET ('replication_num'='%d')";

protected static final String ALL_COLUMNS =
"`pipe_id`, `file_name`, `file_version`, `file_size`, `state`, `last_modified`, `staged_time`," +
Expand Down Expand Up @@ -155,15 +154,14 @@ public void destroy() {
*/
static class SQLBuilder {

public static String buildCreateTableSql() throws UserException {
int replica = AutoInferUtil.calDefaultReplicationNum();
public static String buildCreateTableSql(int replicationNum) throws UserException {
return String.format(FILE_LIST_TABLE_CREATE,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replica);
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}

public static String buildAlterTableSql() {
public static String buildAlterTableSql(int replicationNum) {
return String.format(CORRECT_FILE_LIST_REPLICATION_NUM,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME));
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package com.starrocks.load.pipe.filelist;

import com.starrocks.catalog.OlapTable;
import com.starrocks.common.UserException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -30,7 +30,6 @@ public class RepoCreator {

private static boolean databaseExists = false;
private static boolean tableExists = false;
private static boolean tableCorrected = false;

public static RepoCreator getInstance() {
return INSTANCE;
Expand All @@ -50,10 +49,7 @@ public void run() {
LOG.info("table created: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableCorrected = true;
}
correctTable();
} catch (Exception e) {
LOG.error("error happens in RepoCreator: ", e);
}
Expand All @@ -64,29 +60,14 @@ public boolean checkDatabaseExists() {
}

public static void createTable() throws UserException {
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql();
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql(expectedReplicationNum);
RepoExecutor.getInstance().executeDDL(sql);
}

public static boolean correctTable() {
int numBackends = GlobalStateMgr.getCurrentSystemInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState()
.mayGetDb(FileListTableRepo.FILE_LIST_DB_NAME)
.flatMap(db -> db.mayGetTable(FileListTableRepo.FILE_LIST_TABLE_NAME))
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);
if (numBackends < 3) {
LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends);
return false;
}
if (replica < 3) {
String sql = FileListTableRepo.SQLBuilder.buildAlterTableSql();
RepoExecutor.getInstance().executeDDL(sql);
} else {
LOG.info("table {} already has {} replicas, no need to alter replication_num",
FileListTableRepo.FILE_LIST_FULL_NAME, replica);
}
return true;
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(FileListTableRepo.FILE_LIST_TABLE_NAME);
}

public boolean isDatabaseExists() {
Expand All @@ -96,8 +77,4 @@ public boolean isDatabaseExists() {
public boolean isTableExists() {
return tableExists;
}

public boolean isTableCorrected() {
return tableCorrected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -2772,6 +2773,10 @@ public Table getTable(String dbName, String tblName) {
return database.getTable(tblName);
}

public Optional<Table> mayGetTable(String dbName, String tblName) {
return Optional.ofNullable(getTable(dbName, tblName));
}

@Override
public Pair<Table, MaterializedIndexMeta> getMaterializedViewIndex(String dbName, String indexName) {
Database database = getDb(dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.starrocks.connector.PartitionInfo;
import com.starrocks.load.EtlStatus;
import com.starrocks.load.loadv2.LoadJobFinalOperation;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.load.streamload.StreamLoadTxnCommitAttachment;
import com.starrocks.privilege.PrivilegeBuiltinConstants;
import com.starrocks.qe.ConnectContext;
Expand All @@ -62,6 +63,7 @@
import com.starrocks.transaction.TableCommitInfo;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TxnCommitAttachment;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.Snapshot;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -564,6 +566,32 @@ public static void dropStatisticsAfterDropTable(Table table) {
GlobalStateMgr.getCurrentStatisticStorage().expireConnectorTableColumnStatistics(table, columns);
}

/**
* Change the replication_num of system table according to cluster status
* 1. When scale-out to greater than 3 nodes, change the replication_num to 3
* 3. When scale-in to less than 3 node, change it to retainedBackendNum
*/
public static boolean alterSystemTableReplicationNumIfNecessary(String tableName) {
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
int replica = GlobalStateMgr.getCurrentState()
.getLocalMetastore().mayGetTable(StatsConstants.STATISTICS_DB_NAME, tableName)
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);

if (replica != expectedReplicationNum) {
String sql = String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')",
StatsConstants.STATISTICS_DB_NAME, tableName, expectedReplicationNum);
if (StringUtils.isNotEmpty(sql)) {
RepoExecutor.getInstance().executeDDL(sql);
}
LOG.info("changed replication_number of table {} from {} to {}",
tableName, replica, expectedReplicationNum);
return true;
}
return false;
}

// only support collect statistics for slotRef and subfield expr
public static String getColumnName(Table table, Expr column) {
String colName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
Expand All @@ -38,7 +34,6 @@
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.ast.CreateDbStmt;
import com.starrocks.sql.ast.CreateTableStmt;
import com.starrocks.sql.ast.DropTableStmt;
import com.starrocks.sql.ast.HashDistributionDesc;
import com.starrocks.sql.common.EngineType;
import com.starrocks.sql.common.ErrorType;
Expand All @@ -53,9 +48,6 @@
public class StatisticsMetaManager extends FrontendDaemon {
private static final Logger LOG = LogManager.getLogger(StatisticsMetaManager.class);

// If all replicas are lost more than 3 times in a row, rebuild the statistics table
private int lossTableCount = 0;

public StatisticsMetaManager() {
super("statistics meta manager", 60L * 1000L);
}
Expand Down Expand Up @@ -83,42 +75,6 @@ private boolean checkTableExist(String tableName) {
return db.getTable(tableName) != null;
}

private boolean checkReplicateNormal(String tableName) {
int aliveSize = GlobalStateMgr.getCurrentSystemInfo().getAliveBackendNumber();
int total = GlobalStateMgr.getCurrentSystemInfo().getTotalBackendNumber();
// maybe cluster just shutdown, ignore
if (aliveSize <= total / 2) {
lossTableCount = 0;
return true;
}

Database db = GlobalStateMgr.getCurrentState().getDb(StatsConstants.STATISTICS_DB_NAME);
Preconditions.checkState(db != null);
OlapTable table = (OlapTable) db.getTable(tableName);
Preconditions.checkState(table != null);
if (table.isCloudNativeTableOrMaterializedView()) {
return true;
}

boolean check = true;
for (Partition partition : table.getPartitions()) {
// check replicate miss
if (partition.getBaseIndex().getTablets().stream()
.anyMatch(t -> ((LocalTablet) t).getNormalReplicaBackendIds().isEmpty())) {
check = false;
break;
}
}

if (!check) {
lossTableCount++;
} else {
lossTableCount = 0;
}

return lossTableCount < 3;
}

private static final List<String> KEY_COLUMN_NAMES = ImmutableList.of(
"table_id", "column_name", "db_id"
);
Expand Down Expand Up @@ -324,21 +280,6 @@ private void refreshAnalyzeJob() {
}
}

private boolean dropTable(String tableName) {
LOG.info("drop statistics table start");
DropTableStmt stmt = new DropTableStmt(true,
new TableName(StatsConstants.STATISTICS_DB_NAME, tableName), true);

try {
GlobalStateMgr.getCurrentState().dropTable(stmt);
} catch (DdlException e) {
LOG.warn("Failed to drop table" + e.getMessage());
return false;
}
LOG.info("drop statistics table done");
return !checkTableExist(tableName);
}

private void trySleep(long millis) {
try {
Thread.sleep(millis);
Expand All @@ -349,40 +290,34 @@ private void trySleep(long millis) {

private boolean createTable(String tableName) {
ConnectContext context = StatisticUtils.buildConnectContext();
context.setThreadLocalInfo();

if (tableName.equals(StatsConstants.SAMPLE_STATISTICS_TABLE_NAME)) {
return createSampleStatisticsTable(context);
} else if (tableName.equals(StatsConstants.FULL_STATISTICS_TABLE_NAME)) {
return createFullStatisticsTable(context);
} else if (tableName.equals(StatsConstants.HISTOGRAM_STATISTICS_TABLE_NAME)) {
return createHistogramStatisticsTable(context);
} else if (tableName.equals(StatsConstants.EXTERNAL_FULL_STATISTICS_TABLE_NAME)) {
return createExternalFullStatisticsTable(context);
} else if (tableName.equals(StatsConstants.EXTERNAL_HISTOGRAM_STATISTICS_TABLE_NAME)) {
return createExternalHistogramStatisticsTable(context);
} else {
throw new StarRocksPlannerException("Error table name " + tableName, ErrorType.INTERNAL_ERROR);
try (ConnectContext.ScopeGuard guard = context.bindScope()) {
if (tableName.equals(StatsConstants.SAMPLE_STATISTICS_TABLE_NAME)) {
return createSampleStatisticsTable(context);
} else if (tableName.equals(StatsConstants.FULL_STATISTICS_TABLE_NAME)) {
return createFullStatisticsTable(context);
} else if (tableName.equals(StatsConstants.HISTOGRAM_STATISTICS_TABLE_NAME)) {
return createHistogramStatisticsTable(context);
} else if (tableName.equals(StatsConstants.EXTERNAL_FULL_STATISTICS_TABLE_NAME)) {
return createExternalFullStatisticsTable(context);
} else if (tableName.equals(StatsConstants.EXTERNAL_HISTOGRAM_STATISTICS_TABLE_NAME)) {
return createExternalHistogramStatisticsTable(context);
} else {
throw new StarRocksPlannerException("Error table name " + tableName, ErrorType.INTERNAL_ERROR);
}
}
}

private void refreshStatisticsTable(String tableName) {
while (checkTableExist(tableName) && !checkReplicateNormal(tableName)) {
LOG.info("statistics table " + tableName + " replicate is not normal, will drop table and rebuild");
if (dropTable(tableName)) {
break;
}
LOG.warn("drop statistics table " + tableName + " failed");
trySleep(10000);
}

while (!checkTableExist(tableName)) {
if (createTable(tableName)) {
break;
}
LOG.warn("create statistics table " + tableName + " failed");
trySleep(10000);
}
if (checkTableExist(tableName)) {
StatisticUtils.alterSystemTableReplicationNumIfNecessary(tableName);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
Expand Down Expand Up @@ -768,6 +769,17 @@ public int getAliveBackendNumber() {
return getBackendIds(true).size();
}

public int getRetainedBackendNumber() {
return getRetainedBackends().size();
}

public int getSystemTableExpectedReplicationNum() {
if (RunMode.isSharedDataMode()) {
return 1;
}
return Integer.max(1, Integer.min(Config.default_replication_num, getRetainedBackendNumber()));
}

public int getTotalBackendNumber() {
return idToBackendRef.size();
}
Expand Down Expand Up @@ -871,12 +883,22 @@ public List<Backend> getBackends() {
return Lists.newArrayList(idToBackendRef.values());
}

/**
* Available: not decommissioned and alive
*/
public List<Backend> getAvailableBackends() {
return getBackends().stream()
.filter(ComputeNode::isAvailable)
.collect(Collectors.toList());
}

/**
* Retained: not decommissioned, whatever alive or not
*/
public List<Backend> getRetainedBackends() {
return getBackends().stream().filter(x -> !x.isDecommissioned()).collect(Collectors.toList());
}

public List<ComputeNode> getComputeNodes() {
return Lists.newArrayList(idToComputeNodeRef.values());
}
Expand Down
Loading

0 comments on commit d0d01da

Please sign in to comment.