in fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java [576:1050]
private void checkAndPrepareMeta() {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
return;
}
// generate job id
jobId = env.getNextId();
// deserialize meta
if (!downloadAndDeserializeMetaInfo()) {
return;
}
Preconditions.checkNotNull(backupMeta);
// Check the olap table state.
//
// If isAtomicRestore is not set, set all restored tbls' state to RESTORE,
// the table's origin state must be NORMAL and does not have unfinished load job.
for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
if (tbl == null) {
continue;
}
if (tbl.getType() != TableType.OLAP) {
status = new Status(ErrCode.COMMON_ERROR, "Only support retore OLAP table: " + tbl.getName());
return;
}
OlapTable olapTbl = (OlapTable) tbl;
if (!olapTbl.writeLockIfExist()) {
continue;
}
try {
if (olapTbl.getState() != OlapTableState.NORMAL) {
status = new Status(ErrCode.COMMON_ERROR,
"Table " + tbl.getName() + "'s state is not NORMAL: " + olapTbl.getState().name());
return;
}
if (olapTbl.existTempPartitions()) {
status = new Status(ErrCode.COMMON_ERROR, "Do not support restoring table with temp partitions");
return;
}
if (isAtomicRestore) {
// We will create new OlapTable in atomic restore, so does not set the RESTORE state.
// Instead, set table in atomic restore state, to forbid the alter table operation.
olapTbl.setInAtomicRestore();
continue;
}
for (Partition partition : olapTbl.getPartitions()) {
if (!env.getLoadInstance().checkPartitionLoadFinished(partition.getId(), null)) {
status = new Status(ErrCode.COMMON_ERROR,
"Table " + tbl.getName() + "'s has unfinished load job");
return;
}
}
olapTbl.setState(OlapTableState.RESTORE);
// set restore status for partitions
BackupOlapTableInfo tblInfo = jobInfo.backupOlapTableObjects.get(tableName);
for (Map.Entry<String, BackupPartitionInfo> partitionEntry : tblInfo.partitions.entrySet()) {
String partitionName = partitionEntry.getKey();
Partition partition = olapTbl.getPartition(partitionName);
if (partition == null) {
continue;
}
partition.setState(PartitionState.RESTORE);
}
} finally {
olapTbl.writeUnlock();
}
}
for (BackupJobInfo.BackupViewInfo backupViewInfo : jobInfo.newBackupObjects.views) {
Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(backupViewInfo.name));
if (tbl == null) {
continue;
}
if (tbl.getType() != TableType.VIEW) {
status = new Status(ErrCode.COMMON_ERROR,
"The local table " + tbl.getName()
+ " with the same name but a different type of backup meta.");
return;
}
}
for (BackupJobInfo.BackupOdbcTableInfo backupOdbcTableInfo : jobInfo.newBackupObjects.odbcTables) {
Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(backupOdbcTableInfo.dorisTableName));
if (tbl == null) {
continue;
}
if (tbl.getType() != TableType.ODBC) {
status = new Status(ErrCode.COMMON_ERROR,
"The local table " + tbl.getName()
+ " with the same name but a different type of backup meta.");
return;
}
}
for (BackupJobInfo.BackupOdbcResourceInfo backupOdbcResourceInfo : jobInfo.newBackupObjects.odbcResources) {
Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(backupOdbcResourceInfo.name);
if (resource == null) {
continue;
}
if (resource.getType() != Resource.ResourceType.ODBC_CATALOG) {
status = new Status(ErrCode.COMMON_ERROR,
"The local resource " + resource.getName()
+ " with the same name but a different type of backup meta.");
return;
}
}
// the new tablets -> { local tablet, schema hash, storage medium }, used in atomic restore.
Map<Long, TabletRef> tabletBases = new HashMap<>();
// Check and prepare meta objects.
Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
// The tables that are restored but not committed, because the table name may be changed.
List<Table> stagingRestoreTables = Lists.newArrayList();
db.readLock();
try {
for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry : jobInfo.backupOlapTableObjects.entrySet()) {
String tableName = olapTableEntry.getKey();
BackupOlapTableInfo tblInfo = olapTableEntry.getValue();
Table remoteTbl = backupMeta.getTable(tableName);
Preconditions.checkNotNull(remoteTbl);
Table localTbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName));
boolean isSchemaChanged = false;
if (localTbl != null && localTbl.getType() != TableType.OLAP) {
// table already exist, but is not OLAP
status = new Status(ErrCode.COMMON_ERROR,
"The type of local table should be same as type of remote table: "
+ remoteTbl.getName());
return;
}
if (localTbl != null) {
OlapTable localOlapTbl = (OlapTable) localTbl;
OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
if (localOlapTbl.isColocateTable() || (reserveColocate && remoteOlapTbl.isColocateTable())) {
status = new Status(ErrCode.COMMON_ERROR, "Not support to restore to local table "
+ tableName + " with colocate group.");
return;
}
localOlapTbl.readLock();
try {
List<String> intersectPartNames = Lists.newArrayList();
Status st = localOlapTbl.getIntersectPartNamesWith(remoteOlapTbl, intersectPartNames);
if (!st.ok()) {
if (isForceReplace) {
LOG.info("{}, will force replace, job: {}",
st.getErrMsg(), this);
isSchemaChanged = true;
} else {
status = st;
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
}
String localTblSignature = localOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
String remoteTblSignature = remoteOlapTbl.getSignature(
BackupHandler.SIGNATURE_VERSION, intersectPartNames);
if (!localTblSignature.equals(remoteTblSignature)) {
if (isForceReplace) {
LOG.info("Table {} already exists but with different schema, will force replace, "
+ "local table: {}, remote table: {}",
tableName, localTblSignature, remoteTblSignature);
isSchemaChanged = true;
} else {
String alias = jobInfo.getAliasByOriginNameIfSet(tableName);
LOG.warn("Table {} already exists but with different schema, "
+ "local table: {}, remote table: {}",
alias, localTblSignature, remoteTblSignature);
status = new Status(ErrCode.COMMON_ERROR, "Table "
+ alias + " already exist but with different schema");
return;
}
}
// Table with same name and has same schema. Check partition
for (Map.Entry<String, BackupPartitionInfo> partitionEntry : tblInfo.partitions.entrySet()) {
String partitionName = partitionEntry.getKey();
BackupPartitionInfo backupPartInfo = partitionEntry.getValue();
Partition localPartition = localOlapTbl.getPartition(partitionName);
Partition remotePartition = remoteOlapTbl.getPartition(partitionName);
if (localPartition != null) {
// Partition already exist.
PartitionInfo localPartInfo = localOlapTbl.getPartitionInfo();
PartitionInfo remotePartInfo = remoteOlapTbl.getPartitionInfo();
ReplicaAllocation remoteReplicaAlloc = remotePartInfo.getReplicaAllocation(
remotePartition.getId());
if (localPartInfo.getType() == PartitionType.RANGE
|| localPartInfo.getType() == PartitionType.LIST) {
PartitionItem localItem = localPartInfo.getItem(localPartition.getId());
PartitionItem remoteItem = remoteOlapTbl
.getPartitionInfo().getItem(backupPartInfo.id);
if (!localItem.equals(remoteItem)) {
// Same partition name, different range
if (isForceReplace) {
isSchemaChanged = true;
} else {
status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " + localTbl.getName()
+ " has different partition item with partition in repository");
return;
}
}
}
if (isAtomicRestore) {
// skip gen file mapping for atomic restore.
continue;
}
// Same partition, same range or a single partitioned table.
if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition,
localTbl, backupPartInfo, partitionName, tblInfo, remoteReplicaAlloc)) {
return;
}
} else if (!isAtomicRestore) {
// partitions does not exist
PartitionInfo localPartitionInfo = localOlapTbl.getPartitionInfo();
if (localPartitionInfo.getType() == PartitionType.RANGE
|| localPartitionInfo.getType() == PartitionType.LIST) {
PartitionItem remoteItem = remoteOlapTbl.getPartitionInfo()
.getItem(backupPartInfo.id);
if (localPartitionInfo.getAnyIntersectItem(remoteItem, false) != null) {
status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
+ " in table " + localTbl.getName()
+ " has conflict partition item with existing items");
return;
} else {
// this partition can be added to this table, set ids
ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
if (reserveReplica) {
PartitionInfo remotePartInfo = remoteOlapTbl.getPartitionInfo();
restoreReplicaAlloc = remotePartInfo.getReplicaAllocation(
remotePartition.getId());
}
Partition restorePart = resetPartitionForRestore(localOlapTbl, remoteOlapTbl,
partitionName,
restoreReplicaAlloc);
if (restorePart == null) {
return;
}
restoredPartitions.add(Pair.of(localOlapTbl.getName(), restorePart));
}
} else {
// It is impossible that a single partitioned table exist
// without any existing partition
status = new Status(ErrCode.COMMON_ERROR,
"No partition exist in single partitioned table " + localOlapTbl.getName());
return;
}
}
}
} finally {
localOlapTbl.readUnlock();
}
}
// Table does not exist or atomic restore
if (localTbl == null || isAtomicRestore) {
OlapTable remoteOlapTbl = (OlapTable) remoteTbl;
// Retain only expected restore partitions in this table;
Set<String> allPartNames = remoteOlapTbl.getPartitionNames();
for (String partName : allPartNames) {
if (!tblInfo.containsPart(partName)) {
remoteOlapTbl.dropPartitionAndReserveTablet(partName);
}
}
// reset all ids in this table
String srcDbName = jobInfo.dbName;
Status st = remoteOlapTbl.resetIdsForRestore(env, db, replicaAlloc, reserveReplica,
reserveColocate, colocatePersistInfos, srcDbName);
if (!st.ok()) {
status = st;
return;
}
// reset next version to visible version + 1 for all partitions
remoteOlapTbl.resetVersionForRestore();
// Reset properties to correct values.
remoteOlapTbl.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica,
replicaAlloc, isBeingSynced);
// DO NOT set remote table's new name here, cause we will still need the origin name later
// remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
remoteOlapTbl.setState(allowLoad ? OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE);
if (isAtomicRestore && localTbl != null && !isSchemaChanged) {
// bind the backends and base tablets from local tbl.
status = bindLocalAndRemoteOlapTableReplicas((OlapTable) localTbl, remoteOlapTbl, tabletBases);
if (!status.ok()) {
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("put remote table {} to restoredTbls", remoteOlapTbl.getName());
}
stagingRestoreTables.add(remoteOlapTbl);
}
} // end of all restore olap tables
// restore views
for (BackupJobInfo.BackupViewInfo backupViewInfo : jobInfo.newBackupObjects.views) {
String backupViewName = backupViewInfo.name;
Table localTbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(backupViewName));
View remoteView = (View) backupMeta.getTable(backupViewName);
if (localTbl != null) {
Preconditions.checkState(localTbl.getType() == TableType.VIEW);
View localView = (View) localTbl;
String localViewSignature = localView.getSignature(BackupHandler.SIGNATURE_VERSION);
// keep compatible with old version, compare the signature without reset view def
if (!localViewSignature.equals(remoteView.getSignature(BackupHandler.SIGNATURE_VERSION))) {
// reset view def to dest db name and compare signature again
String srcDbName = jobInfo.dbName;
remoteView.resetViewDefForRestore(srcDbName, db.getName());
if (!localViewSignature.equals(remoteView.getSignature(BackupHandler.SIGNATURE_VERSION))) {
if (isForceReplace) {
LOG.info("View {} already exist but with different schema, will force replace, "
+ "local view: {}, remote view: {}",
backupViewName, localViewSignature,
remoteView.getSignature(BackupHandler.SIGNATURE_VERSION));
} else {
LOG.warn("View {} already exist but with different schema, will force replace, "
+ "local view: {}, remote view: {}",
backupViewName, localViewSignature,
remoteView.getSignature(BackupHandler.SIGNATURE_VERSION));
status = new Status(ErrCode.COMMON_ERROR, "View "
+ jobInfo.getAliasByOriginNameIfSet(backupViewName)
+ " already exist but with different schema");
return;
}
}
}
}
if (localTbl == null || isAtomicRestore) {
String srcDbName = jobInfo.dbName;
remoteView.resetViewDefForRestore(srcDbName, db.getName());
remoteView.resetIdsForRestore(env);
stagingRestoreTables.add(remoteView);
}
}
// restore odbc external table
for (BackupJobInfo.BackupOdbcTableInfo backupOdbcTableInfo : jobInfo.newBackupObjects.odbcTables) {
String backupOdbcTableName = backupOdbcTableInfo.dorisTableName;
Table localTbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(backupOdbcTableName));
OdbcTable remoteOdbcTable = (OdbcTable) backupMeta.getTable(backupOdbcTableName);
if (localTbl != null) {
Preconditions.checkState(localTbl.getType() == TableType.ODBC);
OdbcTable localOdbcTable = (OdbcTable) localTbl;
if (!localOdbcTable.getSignature(BackupHandler.SIGNATURE_VERSION)
.equals(remoteOdbcTable.getSignature(BackupHandler.SIGNATURE_VERSION))) {
status = new Status(ErrCode.COMMON_ERROR, "Odbc table "
+ jobInfo.getAliasByOriginNameIfSet(backupOdbcTableName)
+ " already exist but with different schema");
return;
}
} else {
remoteOdbcTable.resetIdsForRestore(env);
stagingRestoreTables.add(remoteOdbcTable);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("finished to prepare restored partitions and tables. {}", this);
}
// for now, nothing is modified in catalog
// generate create replica tasks for all restored partitions
if (isAtomicRestore && !restoredPartitions.isEmpty()) {
throw new RuntimeException("atomic restore is set, but the restored partitions is not empty");
}
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
Preconditions.checkNotNull(localTbl, localTbl.getName());
Partition restorePart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);
genFileMapping(localTbl, restorePart, remoteTbl.getId(), backupPartitionInfo,
!allowLoad /* if allow load, do not overwrite when commit */);
}
// generate create replica task for all restored tables
for (Table restoreTbl : stagingRestoreTables) {
if (restoreTbl.getType() == TableType.OLAP) {
OlapTable restoreOlapTable = (OlapTable) restoreTbl;
for (Partition restorePart : restoreOlapTable.getPartitions()) {
AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(restoreTbl.getId(), batchTask);
}
createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases);
BackupOlapTableInfo backupOlapTableInfo = jobInfo.getOlapTableInfo(restoreOlapTable.getName());
genFileMapping(restoreOlapTable, restorePart, backupOlapTableInfo.id,
backupOlapTableInfo.getPartInfo(restorePart.getName()),
!allowLoad /* if allow load, do not overwrite when commit */,
tabletBases);
}
}
// set restored table's new name after all 'genFileMapping'
String tableName = jobInfo.getAliasByOriginNameIfSet(restoreTbl.getName());
if (Env.isStoredTableNamesLowerCase()) {
tableName = tableName.toLowerCase();
}
if ((restoreTbl.getType() == TableType.OLAP || restoreTbl
.getType() == TableType.VIEW) && isAtomicRestore) {
tableName = tableAliasWithAtomicRestore(tableName);
}
restoreTbl.setName(tableName);
restoredTbls.add(restoreTbl);
}
if (LOG.isDebugEnabled()) {
LOG.debug("finished to generate create replica tasks. {}", this);
}
} finally {
db.readUnlock();
}
// check and restore resources
checkAndRestoreResources();
if (!status.ok()) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("finished to restore resources. {}", this.jobId);
}
// Send create replica task to BE outside the db lock
int numBatchTasks = batchTaskPerTable.values()
.stream()
.mapToInt(AgentBatchTask::getTaskNum)
.sum();
createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks);
if (numBatchTasks > 0) {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. {}", numBatchTasks, this);
for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
for (AgentTask task : batchTask.getAllTasks()) {
createReplicaTasksLatch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(createReplicaTasksLatch);
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
}
// No log here, PENDING state restore job will redo this method
state = RestoreJobState.CREATING;
createReplicasTimeStamp = System.currentTimeMillis();
}