Skip to content

Commit

Permalink
[Feature] Support Backup/Restore for external catalog (#52895)
Browse files Browse the repository at this point in the history
Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch authored Nov 15, 2024
1 parent 8bda0cb commit 2225f3c
Show file tree
Hide file tree
Showing 20 changed files with 579 additions and 82 deletions.
139 changes: 108 additions & 31 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import com.starrocks.backup.BackupJob.BackupJobState;
import com.starrocks.backup.BackupJobInfo.BackupTableInfo;
import com.starrocks.backup.mv.MvRestoreContext;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.InternalCatalog;
import com.starrocks.catalog.MaterializedIndex.IndexExtState;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.OlapTable;
Expand All @@ -74,6 +76,7 @@
import com.starrocks.sql.ast.BackupStmt;
import com.starrocks.sql.ast.BackupStmt.BackupType;
import com.starrocks.sql.ast.CancelBackupStmt;
import com.starrocks.sql.ast.CatalogRef;
import com.starrocks.sql.ast.CreateRepositoryStmt;
import com.starrocks.sql.ast.DropRepositoryStmt;
import com.starrocks.sql.ast.FunctionRef;
Expand All @@ -98,6 +101,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -108,6 +112,8 @@
public class BackupHandler extends FrontendDaemon implements Writable, MemoryTrackable {

private static final Logger LOG = LogManager.getLogger(BackupHandler.class);

private static final long FAKE_DB_ID = -1;

public static final int SIGNATURE_VERSION = 1;
public static final Path BACKUP_ROOT_DIR = Paths.get(Config.tmp_dir, "backup").normalize();
Expand All @@ -124,6 +130,8 @@ public class BackupHandler extends FrontendDaemon implements Writable, MemoryTra
// If the last job is finished, user can get the job info from repository. If the last job is cancelled,
// user can get the error message before submitting the next one.
// Use ConcurrentMap to get rid of locks.
// Backup/Restore job for external catalog using -1 to identify the job in dbIdToBackupOrRestoreJob
// which means that only one external catalog backup/restore job can be run in entire cluster
protected Map<Long, AbstractJob> dbIdToBackupOrRestoreJob = Maps.newConcurrentMap();

protected MvRestoreContext mvRestoreContext = new MvRestoreContext();
Expand Down Expand Up @@ -261,11 +269,12 @@ public void process(AbstractBackupStmt stmt) throws DdlException {
// check if repo exist
String repoName = stmt.getRepoName();
Repository repository = repoMgr.getRepo(repoName);
Database db = null;
BackupJobInfo jobInfo = null;
if (repository == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repoName + " does not exist");
}

BackupJobInfo jobInfo = null;
if (stmt instanceof RestoreStmt) {
// Check if snapshot exist in repository, if existed, get jobInfo for restore process
List<BackupJobInfo> infos = Lists.newArrayList();
Expand All @@ -277,30 +286,49 @@ public void process(AbstractBackupStmt stmt) throws DdlException {
}
Preconditions.checkState(infos.size() == 1);
jobInfo = infos.get(0);
}

// check if db exist
String dbName = stmt.getDbName();
if (dbName == null) {
// if target dbName if null, use dbName in snapshot
dbName = jobInfo.dbName;
}
if (jobInfo.dbName == null || jobInfo.dbName.isEmpty()) {
// if jobInfo.dbName == null, means that this snapshot only contains external catalog info
if ((stmt.getOriginDbName() != null && !stmt.getOriginDbName().isEmpty()) ||
stmt.getDbName() != null && !stmt.getDbName().isEmpty()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not specify database for external catalog snapshot");
}

Database db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
if (stmt instanceof RestoreStmt) {
try {
globalStateMgr.getLocalMetastore().createDb(dbName, null);
db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
if (!stmt.containsExternalCatalog()) {
// set `ALL` flag for external catalog restore if no `CATALOG(s)` set in restore stmt
stmt.setAllExternalCatalog();
}
} else if (stmt.containsExternalCatalog()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"This is not a snapshot for external catalog restore, snapshot: " + stmt.getLabel());
}
}

if (!stmt.containsExternalCatalog()) {
// check if db exist
String dbName = stmt.getDbName();
if (dbName == null) {
// if target dbName if null, use dbName in snapshot
dbName = jobInfo.dbName;
}

db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
if (stmt instanceof RestoreStmt) {
try {
globalStateMgr.getLocalMetastore().createDb(dbName, null);
db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
} catch (Exception e) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not create database: " + dbName + " in restore process");
}
} catch (Exception e) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not create database: " + dbName + " in restore process");
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
}

Expand All @@ -311,7 +339,7 @@ public void process(AbstractBackupStmt stmt) throws DdlException {
tryLock();
try {
// Check if there is backup or restore job running on this database
AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(db.getId());
AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(stmt.containsExternalCatalog() ? FAKE_DB_ID : db.getId());
if (currentJob != null && !currentJob.isDone()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can only run one backup or restore job of a database at same time");
Expand Down Expand Up @@ -351,7 +379,9 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
List<TableRef> tblRefs = stmt.getTableRefs();
BackupMeta curBackupMeta = null;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
if (!stmt.containsExternalCatalog()) {
locker.lockDatabase(db.getId(), LockType.READ);
}
try {
List<Table> backupTbls = Lists.newArrayList();
for (TableRef tblRef : tblRefs) {
Expand Down Expand Up @@ -410,7 +440,9 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
}
curBackupMeta = new BackupMeta(backupTbls);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
if (!stmt.containsExternalCatalog()) {
locker.unLockDatabase(db.getId(), LockType.READ);
}
}

// Check if label already be used
Expand Down Expand Up @@ -448,31 +480,36 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
}

// Create a backup job
BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(), db.getOriginName(), tblRefs,
long dbId = stmt.containsExternalCatalog() ? FAKE_DB_ID : db.getId();
String dbName = stmt.containsExternalCatalog() ? "" : db.getOriginName();

BackupJob backupJob = new BackupJob(stmt.getLabel(), dbId, dbName, tblRefs,
stmt.getTimeoutMs(), globalStateMgr, repository.getId());
List<Function> allFunctions = Lists.newArrayList();
for (FunctionRef fnRef : stmt.getFnRefs()) {
allFunctions.addAll(fnRef.getFunctions());
}
backupJob.setBackupFunctions(allFunctions);
backupJob.setBackupCatalogs(stmt.getExternalCatalogRefs().stream()
.map(CatalogRef::getCatalog).collect(Collectors.toList()));
// write log
globalStateMgr.getEditLog().logBackupJob(backupJob);

// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
dbIdToBackupOrRestoreJob.put(db.getId(), backupJob);
dbIdToBackupOrRestoreJob.put(dbId, backupJob);

LOG.info("finished to submit backup job: {}", backupJob);
}

private void restore(Repository repository, Database db, RestoreStmt stmt, BackupJobInfo jobInfo) throws DdlException {
BackupMeta backupMeta = downloadAndDeserializeMetaInfo(jobInfo, repository, stmt);

// check the original dbName existed in snapshot or not
if (!stmt.getOriginDbName().isEmpty() && !stmt.getOriginDbName().equals(jobInfo.dbName)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"target database: " + stmt.getOriginDbName() + " is not existed in snapshot");
}

BackupMeta backupMeta = downloadAndDeserializeMetaInfo(jobInfo, repository, stmt);

// If restore statement contains `ON` clause, filter the specified backup objects which are needed through infomation
// provide in stmt and BackupMeta.
if (stmt.withOnClause()) {
Expand All @@ -485,6 +522,10 @@ private void restore(Repository repository, Database db, RestoreStmt stmt, Backu
checkAndFilterRestoreFunctionsInBackupMeta(stmt, backupMeta);
}

if (stmt.containsExternalCatalog()) {
checkAndFilterRestoreCatalogsInBackupMeta(stmt, backupMeta);
}

// Create a restore job
RestoreJob restoreJob = null;
if (backupMeta != null) {
Expand All @@ -498,13 +539,17 @@ private void restore(Repository repository, Database db, RestoreStmt stmt, Backu
mvRestoreContext.addIntoMvBaseTableBackupInfoIfNeeded(db.getOriginName(), remoteTbl, jobInfo, tblInfo);
}
}

long dbId = stmt.containsExternalCatalog() ? FAKE_DB_ID : db.getId();
String dbName = stmt.containsExternalCatalog() ? "" : db.getOriginName();

restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getOriginName(), jobInfo, stmt.allowLoad(), stmt.getReplicationNum(),
dbId, dbName, jobInfo, stmt.allowLoad(), stmt.getReplicationNum(),
stmt.getTimeoutMs(), globalStateMgr, repository.getId(), backupMeta, mvRestoreContext);
globalStateMgr.getEditLog().logRestoreJob(restoreJob);

// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob);
dbIdToBackupOrRestoreJob.put(dbId, restoreJob);

LOG.info("finished to submit restore job: {}", restoreJob);
}
Expand All @@ -528,6 +573,33 @@ protected BackupMeta downloadAndDeserializeMetaInfo(BackupJobInfo jobInfo, Repos
return backupMetas.get(0);
}

protected void checkAndFilterRestoreCatalogsInBackupMeta(RestoreStmt stmt, BackupMeta backupMeta) throws DdlException {
List<Catalog> catalogsInBackupMeta = backupMeta.getCatalogs();
List<Catalog> restoredCatalogs = Lists.newArrayList();
for (CatalogRef catalogRef : stmt.getExternalCatalogRefs()) {
Optional<Catalog> hitCatalog = catalogsInBackupMeta.stream().filter(x -> catalogRef.getCatalogName()
.equalsIgnoreCase(x.getName())).findFirst();
if (!hitCatalog.isPresent()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not find restore catalog: " + catalogRef.getCatalogName());
}

if (catalogRef.getAlias() != null && !catalogRef.getAlias().isEmpty()) {
if (catalogRef.getAlias().equalsIgnoreCase(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support set alias as default catalog for external catalog restore");
}
hitCatalog.get().setName(catalogRef.getAlias());
}

restoredCatalogs.add(hitCatalog.get());
}

if (!restoredCatalogs.isEmpty()) {
backupMeta.setCatalogs(restoredCatalogs);
}
}

protected void checkAndFilterRestoreFunctionsInBackupMeta(RestoreStmt stmt, BackupMeta backupMeta) throws DdlException {
List<Function> functionsInBackupMeta = backupMeta.getFunctions();
List<Function> restoredFunctions = Lists.newArrayList();
Expand Down Expand Up @@ -630,7 +702,12 @@ public AbstractJob getAbstractJobByDbName(String dbName) throws DdlException {
}

public void cancel(CancelBackupStmt stmt) throws DdlException {
AbstractJob job = getAbstractJobByDbName(stmt.getDbName());
AbstractJob job = null;
if (!stmt.isExternalCatalog()) {
job = getAbstractJobByDbName(stmt.getDbName());
} else {
job = dbIdToBackupOrRestoreJob.get(FAKE_DB_ID);
}
if (job == null || (job instanceof BackupJob && stmt.isRestore())
|| (job instanceof RestoreJob && !stmt.isRestore())) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "No "
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.analysis.BrokerDesc;
import com.starrocks.analysis.TableRef;
import com.starrocks.backup.Status.ErrCode;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.FsBroker;
import com.starrocks.catalog.Function;
Expand Down Expand Up @@ -158,6 +159,8 @@ public enum BackupJobState {

@SerializedName(value = "backupFunctions")
private List<Function> backupFunctions = Lists.newArrayList();
@SerializedName(value = "backupCatalogs")
private List<Catalog> backupCatalogs = Lists.newArrayList();

public BackupJob() {
super(JobType.BACKUP);
Expand Down Expand Up @@ -206,6 +209,10 @@ public void setBackupFunctions(List<Function> functions) {
this.backupFunctions = functions;
}

public void setBackupCatalogs(List<Catalog> backupCatalogs) {
this.backupCatalogs = backupCatalogs;
}

public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
Preconditions.checkState(task.getJobId() == jobId);

Expand Down Expand Up @@ -463,6 +470,14 @@ protected void sendSnapshotRequests() {

private void prepareAndSendSnapshotTask() {
MetricRepo.COUNTER_UNFINISHED_BACKUP_JOB.increase(1L);
if (!backupCatalogs.isEmpty()) {
// short cut for external catalogs backup
backupMeta = new BackupMeta(Lists.newArrayList());
backupMeta.setCatalogs(backupCatalogs);
state = BackupJobState.SAVE_META;

return;
}
Database db = globalStateMgr.getLocalMetastore().getDb(dbId);
if (db == null) {
status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.Table;
import com.starrocks.common.io.Text;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class BackupMeta implements Writable, GsonPostProcessable {
private Map<Long, Table> tblIdMap = Maps.newHashMap();
@SerializedName(value = "functions")
private List<Function> functions = Lists.newArrayList();
@SerializedName(value = "catalogs")
private List<Catalog> catalogs = Lists.newArrayList();

private BackupMeta() {

Expand Down Expand Up @@ -99,6 +102,14 @@ public List<Function> getFunctions() {
return functions;
}

public void setCatalogs(List<Catalog> catalogs) {
this.catalogs = catalogs;
}

public List<Catalog> getCatalogs() {
return catalogs;
}

public static BackupMeta fromFile(String filePath, int starrocksMetaVersion) throws IOException {
File file = new File(filePath);
try (DataInputStream dis = new DataInputStream(new FileInputStream(file))) {
Expand Down
Loading

0 comments on commit 2225f3c

Please sign in to comment.