ReplicationThread::CBState ReplicationThread::fullSyncReadCB()

in src/cluster/replication.cc [622:756]


ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
  auto input = bufferevent_get_input(bev);
  switch (fullsync_state_) {
    case kFetchMetaID: {
      // New version master only sends meta file content
      if (!srv_->GetConfig()->master_use_repl_port) {
        fullsync_state_ = kFetchMetaContent;
        return CBState::AGAIN;
      }
      UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
      if (!line) return CBState::AGAIN;
      if (line[0] == '-') {
        error("[replication] Failed to fetch meta id: {}", line.get());
        return CBState::RESTART;
      }
      fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ? std::strtoul(line.get(), nullptr, 10) : 0);
      if (fullsync_meta_id_ == 0) {
        error("[replication] Invalid meta id received");
        return CBState::RESTART;
      }
      fullsync_state_ = kFetchMetaSize;
      info("[replication] Succeed fetching meta id: {}", fullsync_meta_id_);
    }
    case kFetchMetaSize: {
      UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
      if (!line) return CBState::AGAIN;
      if (line[0] == '-') {
        error("[replication] Failed to fetch meta size: {}", line.get());
        return CBState::RESTART;
      }
      fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
      if (fullsync_filesize_ == 0) {
        error("[replication] Invalid meta file size received");
        return CBState::RESTART;
      }
      fullsync_state_ = kFetchMetaContent;
      info("[replication] Succeed fetching meta size: {}", fullsync_filesize_);
    }
    case kFetchMetaContent: {
      std::string target_dir;
      engine::Storage::ReplDataManager::MetaInfo meta;
      // Master using old version
      if (srv_->GetConfig()->master_use_repl_port) {
        if (evbuffer_get_length(input) < fullsync_filesize_) {
          return CBState::AGAIN;
        }
        auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_, fullsync_meta_id_, input, &meta);
        if (!s.IsOK()) {
          error("[replication] Failed to parse meta and save: {}", s.Msg());
          return CBState::AGAIN;
        }
        target_dir = srv_->GetConfig()->backup_sync_dir;
      } else {
        // Master using new version
        UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
        if (!line) return CBState::AGAIN;
        if (line[0] == '-') {
          error("[replication] Failed to fetch meta info: {}", line.get());
          return CBState::RESTART;
        }
        std::vector<std::string> need_files = util::Split(std::string(line.get()), ",");
        for (const auto &f : need_files) {
          meta.files.emplace_back(f, 0);
        }

        target_dir = srv_->GetConfig()->sync_checkpoint_dir;
        // Clean invalid files of checkpoint, "CURRENT" file must be invalid
        // because we identify one file by its file number but only "CURRENT"
        // file doesn't have number.
        auto iter = std::find(need_files.begin(), need_files.end(), "CURRENT");
        if (iter != need_files.end()) need_files.erase(iter);
        auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_, target_dir, need_files);
        if (!s.IsOK()) {
          warn("[replication] Failed to clean up invalid files of the old checkpoint, error: {}", s.Msg());
          warn("[replication] Try to clean all checkpoint files");
          auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options());
          if (!s.ok()) {
            warn("[replication] Failed to clean all checkpoint files, error: {}", s.ToString());
          }
        }
      }
      assert(evbuffer_get_length(input) == 0);
      fullsync_state_ = kFetchMetaID;
      info("[replication] Succeeded fetching full data files info, fetching files in parallel");

      bool pre_fullsync_done = false;
      // If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
      // just like reloading database. And we don't want slave to occupy too much
      // disk space, so we just empty entire database rudely.
      if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
        if (!pre_fullsync_cb_()) return CBState::RESTART;
        pre_fullsync_done = true;
        storage_->EmptyDB();
      }

      repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
      auto s = parallelFetchFile(target_dir, meta.files);
      if (!s.IsOK()) {
        if (pre_fullsync_done) post_fullsync_cb_();
        error("[replication] Failed to parallel fetch files while {}", s.Msg());
        return CBState::RESTART;
      }
      info("[replication] Succeeded fetching files in parallel, restoring the backup");

      // Don't need to call 'pre_fullsync_cb_' again if it was called before
      if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;

      // For old version, master uses rocksdb backup to implement data snapshot
      if (srv_->GetConfig()->master_use_repl_port) {
        s = storage_->RestoreFromBackup();
      } else {
        s = storage_->RestoreFromCheckpoint();
      }
      if (!s.IsOK()) {
        error("[replication] Failed to restore backup while {}, restart fullsync", s.Msg());
        post_fullsync_cb_();
        return CBState::RESTART;
      }
      info("[replication] Succeeded restoring the backup, fullsync was finish");
      post_fullsync_cb_();

      // It needs to reload namespaces from DB after the full sync is done,
      // or namespaces are not visible in the replica.
      s = srv_->GetNamespace()->LoadAndRewrite();
      if (!s.IsOK()) {
        error("[replication] Failed to load and rewrite namespace: {}", s.Msg());
      }

      // Switch to psync state machine again
      psync_steps_.Start();
      return CBState::QUIT;
    }
  }
  unreachable();
}