in hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java [3954:4103]
TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
TransactionInfo checkpointTrxnInfo) throws Exception {
long startTime = Time.monotonicNow();
File oldDBLocation = metadataManager.getStore().getDbLocation();
try {
// Stop Background services
keyManager.stop();
stopSecretManager();
stopTrashEmptier();
omSnapshotManager.invalidateCache();
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the OM Double Buffer so that if there are any
// pending transactions in the buffer, they are discarded.
omRatisServer.getOmStateMachine().pause();
} catch (Exception e) {
LOG.error("Failed to stop/ pause the services. Cannot proceed with " +
"installing the new checkpoint.");
// Stop the checkpoint install process and restart the services.
keyManager.start(configuration);
startSecretManagerIfNecessary();
startTrashEmptier(configuration);
throw e;
}
File dbBackup = null;
TermIndex termIndex = omRatisServer.getLastAppliedTermIndex();
long term = termIndex.getTerm();
long lastAppliedIndex = termIndex.getIndex();
// Check if current applied log index is smaller than the downloaded
// checkpoint transaction index. If yes, proceed by stopping the ratis
// server so that the OM state can be re-initialized. If no then do not
// proceed with installSnapshot.
boolean canProceed = OzoneManagerRatisUtils.verifyTransactionInfo(
checkpointTrxnInfo, lastAppliedIndex, leaderId, checkpointLocation);
boolean oldOmMetadataManagerStopped = false;
boolean newMetadataManagerStarted = false;
boolean omRpcServerStopped = false;
long time = Time.monotonicNow();
if (canProceed) {
// Stop RPC server before stop metadataManager
omRpcServer.stop();
isOmRpcServerRunning = false;
omRpcServerStopped = true;
LOG.info("RPC server is stopped. Spend " +
(Time.monotonicNow() - time) + " ms.");
try {
// Stop old metadataManager before replacing DB Dir
time = Time.monotonicNow();
metadataManager.stop();
oldOmMetadataManagerStopped = true;
LOG.info("metadataManager is stopped. Spend " +
(Time.monotonicNow() - time) + " ms.");
} catch (Exception e) {
String errorMsg = "Failed to stop metadataManager. Cannot proceed " +
"with installing the new checkpoint.";
LOG.error(errorMsg);
exitManager.exitSystem(1, errorMsg, e, LOG);
}
try {
time = Time.monotonicNow();
dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex,
oldDBLocation, checkpointLocation);
term = checkpointTrxnInfo.getTerm();
lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex();
LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, " +
"index: {}, time: {} ms", leaderId, term, lastAppliedIndex,
Time.monotonicNow() - time);
} catch (Exception e) {
LOG.error("Failed to install Snapshot from {} as OM failed to replace" +
" DB with downloaded checkpoint. Reloading old OM state.",
leaderId, e);
}
} else {
LOG.warn("Cannot proceed with InstallSnapshot as OM is at TermIndex {} " +
"and checkpoint has lower TermIndex {}. Reloading old state of OM.",
termIndex, checkpointTrxnInfo.getTermIndex());
}
if (oldOmMetadataManagerStopped) {
// Close snapDiff's rocksDB instance only if metadataManager gets closed.
omSnapshotManager.close();
}
// Reload the OM DB store with the new checkpoint.
// Restart (unpause) the state machine and update its last applied index
// to the installed checkpoint's snapshot index.
try {
if (oldOmMetadataManagerStopped) {
time = Time.monotonicNow();
reloadOMState();
setTransactionInfo(TransactionInfo.valueOf(termIndex));
omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
newMetadataManagerStarted = true;
LOG.info("Reloaded OM state with Term: {} and Index: {}. Spend {} ms",
term, lastAppliedIndex, Time.monotonicNow() - time);
} else {
// OM DB is not stopped. Start the services.
keyManager.start(configuration);
startSecretManagerIfNecessary();
startTrashEmptier(configuration);
omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
LOG.info("OM DB is not stopped. Started services with Term: {} and " +
"Index: {}", term, lastAppliedIndex);
}
} catch (Exception ex) {
String errorMsg = "Failed to reload OM state and instantiate services.";
exitManager.exitSystem(1, errorMsg, ex, LOG);
}
if (omRpcServerStopped && newMetadataManagerStarted) {
// Start the RPC server. RPC server start requires metadataManager
try {
time = Time.monotonicNow();
omRpcServer = getRpcServer(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;
LOG.info("RPC server is re-started. Spend " +
(Time.monotonicNow() - time) + " ms.");
} catch (Exception e) {
String errorMsg = "Failed to start RPC Server.";
exitManager.exitSystem(1, errorMsg, e, LOG);
}
}
// Delete the backup DB
try {
if (dbBackup != null) {
FileUtils.deleteFully(dbBackup);
}
} catch (Exception e) {
LOG.error("Failed to delete the backup of the original DB {}",
dbBackup, e);
}
if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) {
// Install Snapshot failed and old state was reloaded. Return null to
// Ratis to indicate that installation failed.
return null;
}
// TODO: We should only return the snpashotIndex to the leader.
// Should be fixed after RATIS-586
TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
LOG.info("Install Checkpoint is finished with Term: {} and Index: {}. " +
"Spend {} ms.", newTermIndex.getTerm(), newTermIndex.getIndex(),
(Time.monotonicNow() - startTime));
return newTermIndex;
}