in src/cluster/replication.cc [622:756]
ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
auto input = bufferevent_get_input(bev);
switch (fullsync_state_) {
case kFetchMetaID: {
// New version master only sends meta file content
if (!srv_->GetConfig()->master_use_repl_port) {
fullsync_state_ = kFetchMetaContent;
return CBState::AGAIN;
}
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
error("[replication] Failed to fetch meta id: {}", line.get());
return CBState::RESTART;
}
fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ? std::strtoul(line.get(), nullptr, 10) : 0);
if (fullsync_meta_id_ == 0) {
error("[replication] Invalid meta id received");
return CBState::RESTART;
}
fullsync_state_ = kFetchMetaSize;
info("[replication] Succeed fetching meta id: {}", fullsync_meta_id_);
}
case kFetchMetaSize: {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
error("[replication] Failed to fetch meta size: {}", line.get());
return CBState::RESTART;
}
fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
if (fullsync_filesize_ == 0) {
error("[replication] Invalid meta file size received");
return CBState::RESTART;
}
fullsync_state_ = kFetchMetaContent;
info("[replication] Succeed fetching meta size: {}", fullsync_filesize_);
}
case kFetchMetaContent: {
std::string target_dir;
engine::Storage::ReplDataManager::MetaInfo meta;
// Master using old version
if (srv_->GetConfig()->master_use_repl_port) {
if (evbuffer_get_length(input) < fullsync_filesize_) {
return CBState::AGAIN;
}
auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_, fullsync_meta_id_, input, &meta);
if (!s.IsOK()) {
error("[replication] Failed to parse meta and save: {}", s.Msg());
return CBState::AGAIN;
}
target_dir = srv_->GetConfig()->backup_sync_dir;
} else {
// Master using new version
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
error("[replication] Failed to fetch meta info: {}", line.get());
return CBState::RESTART;
}
std::vector<std::string> need_files = util::Split(std::string(line.get()), ",");
for (const auto &f : need_files) {
meta.files.emplace_back(f, 0);
}
target_dir = srv_->GetConfig()->sync_checkpoint_dir;
// Clean invalid files of checkpoint, "CURRENT" file must be invalid
// because we identify one file by its file number but only "CURRENT"
// file doesn't have number.
auto iter = std::find(need_files.begin(), need_files.end(), "CURRENT");
if (iter != need_files.end()) need_files.erase(iter);
auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_, target_dir, need_files);
if (!s.IsOK()) {
warn("[replication] Failed to clean up invalid files of the old checkpoint, error: {}", s.Msg());
warn("[replication] Try to clean all checkpoint files");
auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options());
if (!s.ok()) {
warn("[replication] Failed to clean all checkpoint files, error: {}", s.ToString());
}
}
}
assert(evbuffer_get_length(input) == 0);
fullsync_state_ = kFetchMetaID;
info("[replication] Succeeded fetching full data files info, fetching files in parallel");
bool pre_fullsync_done = false;
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
// just like reloading database. And we don't want slave to occupy too much
// disk space, so we just empty entire database rudely.
if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
if (!pre_fullsync_cb_()) return CBState::RESTART;
pre_fullsync_done = true;
storage_->EmptyDB();
}
repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
if (pre_fullsync_done) post_fullsync_cb_();
error("[replication] Failed to parallel fetch files while {}", s.Msg());
return CBState::RESTART;
}
info("[replication] Succeeded fetching files in parallel, restoring the backup");
// Don't need to call 'pre_fullsync_cb_' again if it was called before
if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;
// For old version, master uses rocksdb backup to implement data snapshot
if (srv_->GetConfig()->master_use_repl_port) {
s = storage_->RestoreFromBackup();
} else {
s = storage_->RestoreFromCheckpoint();
}
if (!s.IsOK()) {
error("[replication] Failed to restore backup while {}, restart fullsync", s.Msg());
post_fullsync_cb_();
return CBState::RESTART;
}
info("[replication] Succeeded restoring the backup, fullsync was finish");
post_fullsync_cb_();
// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
error("[replication] Failed to load and rewrite namespace: {}", s.Msg());
}
// Switch to psync state machine again
psync_steps_.Start();
return CBState::QUIT;
}
}
unreachable();
}