Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] auto change replication_num of system tables (backport #51799) #51885

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package com.starrocks.load.loadv2;

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -67,7 +67,7 @@ public class LoadsHistorySyncer extends FrontendDaemon {
"properties('replication_num' = '%d') ";

private static final String CORRECT_LOADS_HISTORY_REPLICATION_NUM =
"ALTER TABLE %s SET ('default.replication_num'='3')";
"ALTER TABLE %s SET ('default.replication_num'='%d')";

private static final String LOADS_HISTORY_SYNC =
"INSERT INTO %s " +
Expand All @@ -77,10 +77,9 @@ public class LoadsHistorySyncer extends FrontendDaemon {
"AND load_finish_time > ( " +
"SELECT COALESCE(MAX(load_finish_time), '0001-01-01 00:00:00') " +
"FROM %s);";

private boolean databaseExists = false;
private boolean tableExists = false;
private boolean tableCorrected = false;

public LoadsHistorySyncer() {
super("Load history syncer", Config.loads_history_sync_interval_second * 1000L);
Expand All @@ -96,6 +95,7 @@ public static void createTable() throws UserException {
}

public static boolean correctTable() {
<<<<<<< HEAD
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState()
.mayGetDb(LOADS_HISTORY_DB_NAME)
Expand All @@ -114,6 +114,9 @@ public static boolean correctTable() {
LOADS_HISTORY_TABLE_NAME, replica);
}
return true;
=======
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(LOADS_HISTORY_TABLE_NAME);
>>>>>>> 0c0ea45ed1 ([Enhancement] auto change replication_num of system tables (#51799))
}

public void checkMeta() throws UserException {
Expand All @@ -129,10 +132,8 @@ public void checkMeta() throws UserException {
LOG.info("table created: " + LOADS_HISTORY_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + LOADS_HISTORY_TABLE_NAME);
tableCorrected = true;
}

correctTable();

if (getInterval() != Config.loads_history_sync_interval_second * 1000L) {
setInterval(Config.loads_history_sync_interval_second * 1000L);
Expand Down Expand Up @@ -168,9 +169,9 @@ public static String buildCreateTableSql() throws UserException {
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildAlterTableSql() {
public static String buildAlterTableSql(int replica) {
return String.format(CORRECT_LOADS_HISTORY_REPLICATION_NUM,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME));
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildSyncSql() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.load.pipe.PipeFileRecord;
import com.starrocks.statistic.StatsConstants;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class FileListTableRepo extends FileListRepo {
"properties('replication_num' = '%d') ";

protected static final String CORRECT_FILE_LIST_REPLICATION_NUM =
"ALTER TABLE %s SET ('replication_num'='3')";
"ALTER TABLE %s SET ('replication_num'='%d')";

protected static final String ALL_COLUMNS =
"`pipe_id`, `file_name`, `file_version`, `file_size`, `state`, `last_modified`, `staged_time`," +
Expand Down Expand Up @@ -155,15 +154,14 @@ public void destroy() {
*/
static class SQLBuilder {

public static String buildCreateTableSql() throws UserException {
int replica = AutoInferUtil.calDefaultReplicationNum();
public static String buildCreateTableSql(int replicationNum) throws UserException {
return String.format(FILE_LIST_TABLE_CREATE,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replica);
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}

public static String buildAlterTableSql() {
public static String buildAlterTableSql(int replicationNum) {
return String.format(CORRECT_FILE_LIST_REPLICATION_NUM,
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME));
CatalogUtils.normalizeTableName(FILE_LIST_DB_NAME, FILE_LIST_TABLE_NAME), replicationNum);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package com.starrocks.load.pipe.filelist;

import com.starrocks.catalog.OlapTable;
import com.starrocks.common.UserException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -30,7 +30,6 @@ public class RepoCreator {

private static boolean databaseExists = false;
private static boolean tableExists = false;
private static boolean tableCorrected = false;

public static RepoCreator getInstance() {
return INSTANCE;
Expand All @@ -50,10 +49,7 @@ public void run() {
LOG.info("table created: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + FileListTableRepo.FILE_LIST_TABLE_NAME);
tableCorrected = true;
}
correctTable();
} catch (Exception e) {
LOG.error("error happens in RepoCreator: ", e);
}
Expand All @@ -64,11 +60,14 @@ public boolean checkDatabaseExists() {
}

public static void createTable() throws UserException {
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql();
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
String sql = FileListTableRepo.SQLBuilder.buildCreateTableSql(expectedReplicationNum);
RepoExecutor.getInstance().executeDDL(sql);
}

public static boolean correctTable() {
<<<<<<< HEAD
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState()
.mayGetDb(FileListTableRepo.FILE_LIST_DB_NAME)
Expand All @@ -87,6 +86,9 @@ public static boolean correctTable() {
FileListTableRepo.FILE_LIST_FULL_NAME, replica);
}
return true;
=======
return StatisticUtils.alterSystemTableReplicationNumIfNecessary(FileListTableRepo.FILE_LIST_TABLE_NAME);
>>>>>>> 0c0ea45ed1 ([Enhancement] auto change replication_num of system tables (#51799))
}

public boolean isDatabaseExists() {
Expand All @@ -96,8 +98,4 @@ public boolean isDatabaseExists() {
public boolean isTableExists() {
return tableExists;
}

public boolean isTableCorrected() {
return tableCorrected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class TableKeeper {
private final String databaseName;
private final String tableName;
private final String createTableSql;
private final int tableReplicas;

private boolean databaseExisted = false;
private boolean tableExisted = false;
Expand All @@ -52,12 +51,10 @@ public class TableKeeper {
public TableKeeper(String database,
String table,
String createTable,
int expectedReplicas,
Supplier<Integer> ttlSupplier) {
this.databaseName = database;
this.tableName = table;
this.createTableSql = createTable;
this.tableReplicas = expectedReplicas;
this.ttlSupplier = ttlSupplier;
}

Expand Down Expand Up @@ -100,23 +97,21 @@ public void createTable() throws UserException {
}

public void correctTable() {
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
int replica = GlobalStateMgr.getCurrentState()
.mayGetDb(databaseName)
.flatMap(db -> db.mayGetTable(tableName))
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);
if (numBackends < tableReplicas) {
LOG.info("not enough backends in the cluster, expected {} but got {}",
tableReplicas, numBackends);
return;
}
if (replica < tableReplicas) {
String sql = alterTableReplicas();

if (replica != expectedReplicationNum) {
String sql = alterTableReplicas(expectedReplicationNum);
if (StringUtils.isNotEmpty(sql)) {
RepoExecutor.getInstance().executeDDL(sql);
}
LOG.info("changed replication_number of table {} to {}", tableName, replica);
LOG.info("changed replication_number of table {} from {} to {}",
tableName, replica, expectedReplicationNum);
}
}

Expand Down Expand Up @@ -148,21 +143,21 @@ private Optional<OlapTable> mayGetTable() {
.flatMap(x -> Optional.of((OlapTable) x));
}

private String alterTableReplicas() {
private String alterTableReplicas(int replicationNum) {
Optional<OlapTable> table = mayGetTable();
if (table.isEmpty()) {
return "";
}
PartitionInfo partitionInfo = table.get().getPartitionInfo();
if (partitionInfo.isRangePartition()) {
String sql1 = String.format("ALTER TABLE %s.%s MODIFY PARTITION(*) SET ('replication_num'='%d');",
databaseName, tableName, tableReplicas);
databaseName, tableName, replicationNum);
String sql2 = String.format("ALTER TABLE %s.%s SET ('default.replication_num'='%d');",
databaseName, tableName, tableReplicas);
databaseName, tableName, replicationNum);
return sql1 + sql2;
} else {
return String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')",
databaseName, tableName, tableReplicas);
databaseName, tableName, replicationNum);
}
}

Expand All @@ -182,10 +177,6 @@ public String getCreateTableSql() {
return createTableSql;
}

public int getTableReplicas() {
return tableReplicas;
}

public boolean isDatabaseExisted() {
return databaseExisted;
}
Expand All @@ -202,14 +193,6 @@ public void setDatabaseExisted(boolean databaseExisted) {
this.databaseExisted = databaseExisted;
}

public void setTableExisted(boolean tableExisted) {
this.tableExisted = tableExisted;
}

public void setTableCorrected(boolean tableCorrected) {
this.tableCorrected = tableCorrected;
}

public static TableKeeperDaemon startDaemon() {
TableKeeperDaemon daemon = TableKeeperDaemon.getInstance();
daemon.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class TaskRunHistoryTable {
public static final String DATABASE_NAME = StatsConstants.STATISTICS_DB_NAME;
public static final String TABLE_NAME = "task_run_history";
public static final String TABLE_FULL_NAME = DATABASE_NAME + "." + TABLE_NAME;
public static final int TABLE_REPLICAS = 3;
public static final String CREATE_TABLE =
String.format("CREATE TABLE IF NOT EXISTS %s (" +
// identifiers
Expand Down Expand Up @@ -93,7 +92,7 @@ public class TaskRunHistoryTable {
"SELECT history_content_json " + "FROM " + TABLE_FULL_NAME + " WHERE ";

private static final TableKeeper KEEPER =
new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE, TABLE_REPLICAS,
new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE,
() -> Math.max(1, Config.task_runs_ttl_second / 3600 / 24));

public static TableKeeper createKeeper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.connector.ConnectorPartitionTraits;
import com.starrocks.load.EtlStatus;
import com.starrocks.load.loadv2.LoadJobFinalOperation;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.load.streamload.StreamLoadTxnCommitAttachment;
import com.starrocks.privilege.PrivilegeBuiltinConstants;
import com.starrocks.qe.ConnectContext;
Expand All @@ -61,6 +62,7 @@
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TxnCommitAttachment;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -475,6 +477,32 @@ public static void dropStatisticsAfterDropTable(Table table) {
GlobalStateMgr.getCurrentState().getStatisticStorage().expireConnectorTableColumnStatistics(table, columns);
}

/**
* Change the replication_num of system table according to cluster status
* 1. When scale-out to greater than 3 nodes, change the replication_num to 3
* 3. When scale-in to less than 3 node, change it to retainedBackendNum
*/
public static boolean alterSystemTableReplicationNumIfNecessary(String tableName) {
int expectedReplicationNum =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getSystemTableExpectedReplicationNum();
int replica = GlobalStateMgr.getCurrentState()
.getLocalMetastore().mayGetTable(StatsConstants.STATISTICS_DB_NAME, tableName)
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);

if (replica != expectedReplicationNum) {
String sql = String.format("ALTER TABLE %s.%s SET ('replication_num'='%d')",
StatsConstants.STATISTICS_DB_NAME, tableName, expectedReplicationNum);
if (StringUtils.isNotEmpty(sql)) {
RepoExecutor.getInstance().executeDDL(sql);
}
LOG.info("changed replication_number of table {} from {} to {}",
tableName, replica, expectedReplicationNum);
return true;
}
return false;
}

// only support collect statistics for slotRef and subfield expr
public static String getColumnName(Table table, Expr column) {
String colName;
Expand Down
Loading
Loading