Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Backup/Restore for external catalog #52895

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 108 additions & 31 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import com.starrocks.backup.BackupJob.BackupJobState;
import com.starrocks.backup.BackupJobInfo.BackupTableInfo;
import com.starrocks.backup.mv.MvRestoreContext;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.InternalCatalog;
import com.starrocks.catalog.MaterializedIndex.IndexExtState;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.OlapTable;
Expand All @@ -74,6 +76,7 @@
import com.starrocks.sql.ast.BackupStmt;
import com.starrocks.sql.ast.BackupStmt.BackupType;
import com.starrocks.sql.ast.CancelBackupStmt;
import com.starrocks.sql.ast.CatalogRef;
import com.starrocks.sql.ast.CreateRepositoryStmt;
import com.starrocks.sql.ast.DropRepositoryStmt;
import com.starrocks.sql.ast.FunctionRef;
Expand All @@ -98,6 +101,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -108,6 +112,8 @@
public class BackupHandler extends FrontendDaemon implements Writable, MemoryTrackable {

private static final Logger LOG = LogManager.getLogger(BackupHandler.class);

private static final long FAKE_DB_ID = -1;

public static final int SIGNATURE_VERSION = 1;
public static final Path BACKUP_ROOT_DIR = Paths.get(Config.tmp_dir, "backup").normalize();
Expand All @@ -124,6 +130,8 @@ public class BackupHandler extends FrontendDaemon implements Writable, MemoryTra
// If the last job is finished, user can get the job info from repository. If the last job is cancelled,
// user can get the error message before submitting the next one.
// Use ConcurrentMap to get rid of locks.
// Backup/Restore job for external catalog using -1 to identify the job in dbIdToBackupOrRestoreJob
// which means that only one external catalog backup/restore job can be run in entire cluster
srlch marked this conversation as resolved.
Show resolved Hide resolved
protected Map<Long, AbstractJob> dbIdToBackupOrRestoreJob = Maps.newConcurrentMap();

protected MvRestoreContext mvRestoreContext = new MvRestoreContext();
Expand Down Expand Up @@ -261,11 +269,12 @@ public void process(AbstractBackupStmt stmt) throws DdlException {
// check if repo exist
String repoName = stmt.getRepoName();
Repository repository = repoMgr.getRepo(repoName);
Database db = null;
BackupJobInfo jobInfo = null;
if (repository == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repoName + " does not exist");
}

BackupJobInfo jobInfo = null;
if (stmt instanceof RestoreStmt) {
// Check if snapshot exist in repository, if existed, get jobInfo for restore process
List<BackupJobInfo> infos = Lists.newArrayList();
Expand All @@ -277,30 +286,49 @@ public void process(AbstractBackupStmt stmt) throws DdlException {
}
Preconditions.checkState(infos.size() == 1);
jobInfo = infos.get(0);
}

// check if db exist
String dbName = stmt.getDbName();
if (dbName == null) {
// if target dbName if null, use dbName in snapshot
dbName = jobInfo.dbName;
}
if (jobInfo.dbName == null || jobInfo.dbName.isEmpty()) {
// if jobInfo.dbName == null, means that this snapshot only contains external catalog info
if ((stmt.getOriginDbName() != null && !stmt.getOriginDbName().isEmpty()) ||
stmt.getDbName() != null && !stmt.getDbName().isEmpty()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not specify database for external catalog snapshot");
srlch marked this conversation as resolved.
Show resolved Hide resolved
}

Database db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
if (stmt instanceof RestoreStmt) {
try {
globalStateMgr.getLocalMetastore().createDb(dbName, null);
db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
if (!stmt.containsExternalCatalog()) {
// set `ALL` flag for external catalog restore if no `CATALOG(s)` set in restore stmt
stmt.setAllExternalCatalog();
}
} else if (stmt.containsExternalCatalog()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"This is not a snapshot for external catalog restore, snapshot: " + stmt.getLabel());
}
}

if (!stmt.containsExternalCatalog()) {
// check if db exist
String dbName = stmt.getDbName();
if (dbName == null) {
// if target dbName if null, use dbName in snapshot
dbName = jobInfo.dbName;
}

db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
if (stmt instanceof RestoreStmt) {
try {
globalStateMgr.getLocalMetastore().createDb(dbName, null);
db = globalStateMgr.getLocalMetastore().getDb(dbName);
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
} catch (Exception e) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not create database: " + dbName + " in restore process");
}
} catch (Exception e) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not create database: " + dbName + " in restore process");
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
}

Expand All @@ -311,7 +339,7 @@ public void process(AbstractBackupStmt stmt) throws DdlException {
tryLock();
try {
// Check if there is backup or restore job running on this database
AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(db.getId());
AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(stmt.containsExternalCatalog() ? FAKE_DB_ID : db.getId());
if (currentJob != null && !currentJob.isDone()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can only run one backup or restore job of a database at same time");
Expand Down Expand Up @@ -351,7 +379,9 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
List<TableRef> tblRefs = stmt.getTableRefs();
BackupMeta curBackupMeta = null;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
if (!stmt.containsExternalCatalog()) {
locker.lockDatabase(db.getId(), LockType.READ);
}
try {
List<Table> backupTbls = Lists.newArrayList();
for (TableRef tblRef : tblRefs) {
Expand Down Expand Up @@ -410,7 +440,9 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
}
curBackupMeta = new BackupMeta(backupTbls);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
if (!stmt.containsExternalCatalog()) {
locker.unLockDatabase(db.getId(), LockType.READ);
}
}

// Check if label already be used
Expand Down Expand Up @@ -448,31 +480,36 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
}

// Create a backup job
BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(), db.getOriginName(), tblRefs,
long dbId = stmt.containsExternalCatalog() ? FAKE_DB_ID : db.getId();
String dbName = stmt.containsExternalCatalog() ? "" : db.getOriginName();

BackupJob backupJob = new BackupJob(stmt.getLabel(), dbId, dbName, tblRefs,
stmt.getTimeoutMs(), globalStateMgr, repository.getId());
List<Function> allFunctions = Lists.newArrayList();
for (FunctionRef fnRef : stmt.getFnRefs()) {
allFunctions.addAll(fnRef.getFunctions());
}
backupJob.setBackupFunctions(allFunctions);
backupJob.setBackupCatalogs(stmt.getExternalCatalogRefs().stream()
.map(CatalogRef::getCatalog).collect(Collectors.toList()));
// write log
globalStateMgr.getEditLog().logBackupJob(backupJob);

// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
dbIdToBackupOrRestoreJob.put(db.getId(), backupJob);
dbIdToBackupOrRestoreJob.put(dbId, backupJob);

LOG.info("finished to submit backup job: {}", backupJob);
}

private void restore(Repository repository, Database db, RestoreStmt stmt, BackupJobInfo jobInfo) throws DdlException {
BackupMeta backupMeta = downloadAndDeserializeMetaInfo(jobInfo, repository, stmt);

// check the original dbName existed in snapshot or not
if (!stmt.getOriginDbName().isEmpty() && !stmt.getOriginDbName().equals(jobInfo.dbName)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"target database: " + stmt.getOriginDbName() + " is not existed in snapshot");
}

BackupMeta backupMeta = downloadAndDeserializeMetaInfo(jobInfo, repository, stmt);

// If restore statement contains `ON` clause, filter the specified backup objects which are needed through infomation
// provide in stmt and BackupMeta.
if (stmt.withOnClause()) {
Expand All @@ -485,6 +522,10 @@ private void restore(Repository repository, Database db, RestoreStmt stmt, Backu
checkAndFilterRestoreFunctionsInBackupMeta(stmt, backupMeta);
}

if (stmt.containsExternalCatalog()) {
checkAndFilterRestoreCatalogsInBackupMeta(stmt, backupMeta);
}

// Create a restore job
RestoreJob restoreJob = null;
if (backupMeta != null) {
Expand All @@ -498,13 +539,17 @@ private void restore(Repository repository, Database db, RestoreStmt stmt, Backu
mvRestoreContext.addIntoMvBaseTableBackupInfoIfNeeded(db.getOriginName(), remoteTbl, jobInfo, tblInfo);
}
}

long dbId = stmt.containsExternalCatalog() ? FAKE_DB_ID : db.getId();
String dbName = stmt.containsExternalCatalog() ? "" : db.getOriginName();

restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getOriginName(), jobInfo, stmt.allowLoad(), stmt.getReplicationNum(),
dbId, dbName, jobInfo, stmt.allowLoad(), stmt.getReplicationNum(),
stmt.getTimeoutMs(), globalStateMgr, repository.getId(), backupMeta, mvRestoreContext);
globalStateMgr.getEditLog().logRestoreJob(restoreJob);

// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob);
dbIdToBackupOrRestoreJob.put(dbId, restoreJob);

LOG.info("finished to submit restore job: {}", restoreJob);
}
Expand All @@ -528,6 +573,33 @@ protected BackupMeta downloadAndDeserializeMetaInfo(BackupJobInfo jobInfo, Repos
return backupMetas.get(0);
}

protected void checkAndFilterRestoreCatalogsInBackupMeta(RestoreStmt stmt, BackupMeta backupMeta) throws DdlException {
List<Catalog> catalogsInBackupMeta = backupMeta.getCatalogs();
List<Catalog> restoredCatalogs = Lists.newArrayList();
for (CatalogRef catalogRef : stmt.getExternalCatalogRefs()) {
Optional<Catalog> hitCatalog = catalogsInBackupMeta.stream().filter(x -> catalogRef.getCatalogName()
.equalsIgnoreCase(x.getName())).findFirst();
if (!hitCatalog.isPresent()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can not find restore catalog: " + catalogRef.getCatalogName());
}

if (catalogRef.getAlias() != null && !catalogRef.getAlias().isEmpty()) {
if (catalogRef.getAlias().equalsIgnoreCase(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support set alias as default catalog for external catalog restore");
}
hitCatalog.get().setName(catalogRef.getAlias());
}

restoredCatalogs.add(hitCatalog.get());
}

if (!restoredCatalogs.isEmpty()) {
backupMeta.setCatalogs(restoredCatalogs);
}
}

protected void checkAndFilterRestoreFunctionsInBackupMeta(RestoreStmt stmt, BackupMeta backupMeta) throws DdlException {
List<Function> functionsInBackupMeta = backupMeta.getFunctions();
List<Function> restoredFunctions = Lists.newArrayList();
Expand Down Expand Up @@ -630,7 +702,12 @@ public AbstractJob getAbstractJobByDbName(String dbName) throws DdlException {
}

public void cancel(CancelBackupStmt stmt) throws DdlException {
AbstractJob job = getAbstractJobByDbName(stmt.getDbName());
AbstractJob job = null;
if (!stmt.isExternalCatalog()) {
job = getAbstractJobByDbName(stmt.getDbName());
} else {
job = dbIdToBackupOrRestoreJob.get(FAKE_DB_ID);
}
if (job == null || (job instanceof BackupJob && stmt.isRestore())
|| (job instanceof RestoreJob && !stmt.isRestore())) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "No "
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.analysis.BrokerDesc;
import com.starrocks.analysis.TableRef;
import com.starrocks.backup.Status.ErrCode;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.FsBroker;
import com.starrocks.catalog.Function;
Expand Down Expand Up @@ -158,6 +159,8 @@ public enum BackupJobState {

@SerializedName(value = "backupFunctions")
private List<Function> backupFunctions = Lists.newArrayList();
@SerializedName(value = "backupCatalogs")
private List<Catalog> backupCatalogs = Lists.newArrayList();

public BackupJob() {
super(JobType.BACKUP);
Expand Down Expand Up @@ -206,6 +209,10 @@ public void setBackupFunctions(List<Function> functions) {
this.backupFunctions = functions;
}

public void setBackupCatalogs(List<Catalog> backupCatalogs) {
this.backupCatalogs = backupCatalogs;
}

public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
Preconditions.checkState(task.getJobId() == jobId);

Expand Down Expand Up @@ -463,6 +470,14 @@ protected void sendSnapshotRequests() {

private void prepareAndSendSnapshotTask() {
MetricRepo.COUNTER_UNFINISHED_BACKUP_JOB.increase(1L);
if (!backupCatalogs.isEmpty()) {
// short cut for external catalogs backup
backupMeta = new BackupMeta(Lists.newArrayList());
backupMeta.setCatalogs(backupCatalogs);
state = BackupJobState.SAVE_META;

return;
}
Database db = globalStateMgr.getLocalMetastore().getDb(dbId);
if (db == null) {
status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.Table;
import com.starrocks.common.io.Text;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class BackupMeta implements Writable, GsonPostProcessable {
private Map<Long, Table> tblIdMap = Maps.newHashMap();
@SerializedName(value = "functions")
private List<Function> functions = Lists.newArrayList();
@SerializedName(value = "catalogs")
private List<Catalog> catalogs = Lists.newArrayList();

private BackupMeta() {

Expand Down Expand Up @@ -99,6 +102,14 @@ public List<Function> getFunctions() {
return functions;
}

public void setCatalogs(List<Catalog> catalogs) {
this.catalogs = catalogs;
}

public List<Catalog> getCatalogs() {
return catalogs;
}

public static BackupMeta fromFile(String filePath, int starrocksMetaVersion) throws IOException {
File file = new File(filePath);
try (DataInputStream dis = new DataInputStream(new FileInputStream(file))) {
Expand Down
Loading
Loading