CoTryTask ResyncWorker::handleSync()

in src/storage/sync/ResyncWorker.cc [101:387]


CoTryTask<void> ResyncWorker::handleSync(VersionedChainId vChainId) {
  auto fullSyncLevel = config_.full_sync_level();
  auto needFullSync = fullSyncLevel != FullSyncLevel::NONE &&
                      (config_.full_sync_chains().empty() || config_.full_sync_chains().contains(vChainId.chainId));
  bool heavyFullSync = needFullSync && fullSyncLevel == FullSyncLevel::HEAVY;

  // 1. Cancel the syncing state on exit.
  auto guard = folly::makeGuard([&] {
    shards_.withLock(
        [&](SyncingChainIds &syncingChainIds) {
          XLOGF(DBG9, "sync exit chain {}", vChainId);
          auto &status = syncingChainIds[vChainId.chainId];
          status.isSyncing = false;
          status.lastSyncingTime = RelativeTime::now();
        },
        vChainId.chainId);
  });
  XLOGF(DBG9, "start sync chain {}", vChainId);

  // 2. find target and routing.
  auto targetResult = components_.targetMap.getByChainId(vChainId);
  if (UNLIKELY(!targetResult)) {
    auto msg = fmt::format("sync start {} get routing failed: {}", vChainId, targetResult.error());
    XLOG(ERR, msg);
    co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
  }
  auto target = std::move(*targetResult);
  auto targetId = target->targetId;

  ClientId clientId{};
  static_assert(sizeof(ClientId::uuid) == sizeof(VersionedChainId) + sizeof(TargetId));
  *reinterpret_cast<VersionedChainId *>(clientId.uuid.data) = vChainId;
  *reinterpret_cast<TargetId *>(clientId.uuid.data + sizeof(VersionedChainId)) = targetId;

  monitor::TagSet tag;
  tag.addTag("instance", fmt::format("{}-{}", targetId, vChainId.chainVer));
  uint32_t currentSyncingRemoteMissCount = 0;
  uint32_t currentSyncingRemoteChainVersionLowCount = 0;
  uint32_t currentSyncingRemoteChainVersionHighCount = 0;
  uint32_t currentSyncingRemoteUncommittedCount = 0;
  uint32_t currentSyncingLocalUncommittedCount = 0;
  uint32_t currentSyncingCommitVersionMismatchCount = 0;
  uint32_t currentSyncingCurrentChainIsWritingCount = 0;
  uint32_t currentSyncingRemoteFullSyncHeavyCount = 0;
  uint32_t currentSyncingRemoteFullSyncLightCount = 0;
  uint32_t currentSyncingSkipCount = 0;
  auto recordGuard = resyncRecorder.record(tag);

  auto remainingChunksCount = syncingRemainingChunksCount.getRecorderWithTag(tag);
  SCOPE_EXIT { remainingChunksCount->set(0); };

  // 3. sync start.
  net::UserRequestOptions options;
  options.timeout = config_.sync_start_timeout();
  std::vector<ChunkMeta> remoteMetas;

  auto addrResult = target->getSuccessorAddr();
  if (UNLIKELY(!addrResult)) {
    XLOGF(ERR, "sync start get successor addr error: {}", addrResult.error());
    co_return makeError(std::move(addrResult.error()));
  }
  {
    SyncStartReq syncStartReq;
    syncStartReq.vChainId = vChainId;

    auto syncStartResult = co_await components_.messenger.syncStart(*addrResult, syncStartReq, &options);
    if (UNLIKELY(!syncStartResult)) {
      if (syncStartResult.error().code() == StorageClientCode::kRoutingVersionMismatch) {
        recordGuard.dismiss();
        resyncRoutingVersionMismatch.addSample(1);
        auto msg = fmt::format("sync start {} request failed: {}", vChainId, syncStartResult.error());
        XLOG(DBG9, msg);
        co_return makeError(std::move(syncStartResult.error()));
      }
      auto msg = fmt::format("sync start {} request failed: {}", vChainId, syncStartResult.error());
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
    }

    remoteMetas = std::move(syncStartResult->metas);
  }

  // 3. syncing.
  std::unordered_map<ChunkId, ChunkMetadata> localMetas;
  auto result = target->storageTarget->getAllMetadataMap(localMetas);
  if (UNLIKELY(!result)) {
    XLOGF(ERR, "target invalid iterator {}, error {}", targetId, result.error());
    co_return makeError(std::move(result.error()));
  }
  // re-check current chain version.
  {
    auto targetResult = components_.targetMap.getByChainId(vChainId);
    if (UNLIKELY(!targetResult)) {
      auto msg = fmt::format("sync re-check {} get routing failed: {}", vChainId, targetResult.error());
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
    }
  }
  std::vector<std::pair<ChunkId, uint32_t>> writeList;
  std::vector<ChunkId> removeList;

  bool hasFatalEvents = false;
  for (auto &remoteMeta : remoteMetas) {
    // 1. check exists.
    auto it = localMetas.find(remoteMeta.chunkId);
    if (it == localMetas.end()) {
      removeList.push_back(remoteMeta.chunkId);
      continue;
    }
    SCOPE_EXIT { localMetas.erase(it); };

    // 2. check recycle state.
    const auto &chunkId = it->first;
    const auto &meta = it->second;
    if (UNLIKELY(meta.recycleState != RecycleState::NORMAL)) {
      XLOGF(WARNING, "target {} chunk {} in recycle state: {}", targetId, chunkId, meta);
      syncingLocalChunkInRecycleState.addSample(1);
      continue;  // skip chunk in recycle state.
    }

    // 3. handle updated write (local == remote).
    bool needForward = true;
    if (meta.chainVer > remoteMeta.chainVer) {
      ++currentSyncingRemoteChainVersionLowCount;
    } else if (remoteMeta.updateVer != remoteMeta.commitVer || remoteMeta.chunkState != ChunkState::COMMIT) {
      XLOGF(WARNING, "chain {} remote uncommitted {}", vChainId.chainId, remoteMeta);
      ++currentSyncingRemoteUncommittedCount;
    } else if (meta.chainVer < remoteMeta.chainVer) {
      if (meta.chunkState == ChunkState::COMMIT) {
        ++currentSyncingRemoteChainVersionHighCount;
        XLOGF(DFATAL, "chain {} remote chain version high, local {}, remote {}", vChainId, meta, remoteMeta);
        hasFatalEvents = true;
        break;
      } else {
        needForward = false;
        ++currentSyncingLocalUncommittedCount;
        XLOGF(CRITICAL, "chain {} local uncommitted, local {}, remote {}", vChainId, meta, remoteMeta);
      }
    } else if (meta.updateVer != remoteMeta.commitVer) {
      if (meta.chainVer != vChainId.chainVer && meta.chunkState == ChunkState::COMMIT) {
        ++currentSyncingCommitVersionMismatchCount;
        XLOGF(DFATAL, "chain {} commit version mismatch, local {}, remote {}", vChainId, meta, remoteMeta);
        hasFatalEvents = true;
        break;
      } else {
        needForward = false;
        ++currentSyncingCurrentChainIsWritingCount;
        XLOGF(CRITICAL, "chain {} chain is writing, local {}, remote {}", vChainId, meta, remoteMeta);
      }
    } else if (heavyFullSync) {
      ++currentSyncingRemoteFullSyncHeavyCount;
    } else if (meta.checksum() != remoteMeta.checksum) {
      if (meta.chainVer != vChainId.chainVer) {
        XLOGF(DFATAL, "chain {} checksum not equal, local {}, remote {}", vChainId, meta, remoteMeta);
        ++currentSyncingRemoteFullSyncLightCount;
        hasFatalEvents = true;
        break;
      } else {
        needForward = false;
        ++currentSyncingCurrentChainIsWritingCount;
        XLOGF(CRITICAL,
              "chain {} checksum not equal because of writing, local {}, remote {}",
              vChainId,
              meta,
              remoteMeta);
      }
    } else {
      needForward = false;
    }
    if (needForward) {
      writeList.emplace_back(chunkId, meta.innerFileId.chunkSize);
    } else {
      ++currentSyncingSkipCount;
    }
  }

  if (UNLIKELY(hasFatalEvents)) {
    auto msg = fmt::format("sync {} has fatal events", vChainId);
    XLOG(CRITICAL, msg);

    OfflineTargetReq req;
    req.targetId = target->successor->targetInfo.targetId;
    req.force = true;
    CO_RETURN_AND_LOG_ON_ERROR(co_await components_.messenger.offlineTarget(*addrResult, req, &options));

    co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
  }

  for (auto &[chunkId, meta] : localMetas) {
    writeList.emplace_back(chunkId, meta.innerFileId.chunkSize);
    ++currentSyncingRemoteMissCount;
  }

  syncingRemoteMissCount.addSample(currentSyncingRemoteMissCount, tag);
  syncingRemoteChainVersionLowCount.addSample(currentSyncingRemoteChainVersionLowCount, tag);
  syncingRemoteChainVersionHighCount.addSample(currentSyncingRemoteChainVersionHighCount, tag);
  syncingLocalUncommittedCount.addSample(currentSyncingLocalUncommittedCount, tag);
  syncingRemoteUncommittedCount.addSample(currentSyncingRemoteUncommittedCount, tag);
  syncingCommitVersionMismatchCount.addSample(currentSyncingCommitVersionMismatchCount, tag);
  syncingCurrentChainIsWritingCount.addSample(currentSyncingCurrentChainIsWritingCount, tag);
  syncingRemoteFullSyncHeavyCount.addSample(currentSyncingRemoteFullSyncHeavyCount, tag);
  syncingRemoteFullSyncLightCount.addSample(currentSyncingRemoteFullSyncLightCount, tag);
  syncingSkipCount.addSample(currentSyncingSkipCount, tag);

  auto batchSize = config_.batch_size();
  auto remainingCount = writeList.size() + removeList.size();
  remainingChunksCount->set(remainingCount);
  for (auto batchStart = 0ul; batchStart < removeList.size(); batchStart += batchSize) {
    auto targetResult = components_.targetMap.getByChainId(vChainId);
    if (UNLIKELY(!targetResult)) {
      auto msg = fmt::format("sync re-check {} get routing failed: {}", vChainId, targetResult.error());
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
    }
    target = std::move(*targetResult);
    std::vector<CoTryTask<void>> batch;
    for (auto idx = batchStart; idx < removeList.size() && idx < batchStart + batchSize; ++idx) {
      batch.push_back(forward(target, tag, clientId, std::move(removeList[idx]), UpdateType::REMOVE, 0));
    }
    auto guard = batchConcurrencyLimiter_.lock(0);
    auto results = co_await folly::coro::collectAllRange(std::move(batch));
    for (auto &result : results) {
      if (UNLIKELY(!result)) {
        XLOGF(ERR, "target {} forward remove failed {}", targetId, result.error());
        CO_RETURN_ERROR(result);
      }
    }
    remainingCount -= results.size();
    remainingChunksCount->set(remainingCount);
  }
  for (auto batchStart = 0ul; batchStart < writeList.size(); batchStart += batchSize) {
    auto targetResult = components_.targetMap.getByChainId(vChainId);
    if (UNLIKELY(!targetResult)) {
      auto msg = fmt::format("sync re-check {} get routing failed: {}", vChainId, targetResult.error());
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
    }
    target = std::move(*targetResult);
    std::vector<CoTryTask<void>> batch;
    for (auto idx = batchStart; idx < writeList.size() && idx < batchStart + batchSize; ++idx) {
      auto &[chunkId, chunkSize] = writeList[idx];
      batch.push_back(forward(target, tag, clientId, std::move(chunkId), UpdateType::WRITE, chunkSize));
    }
    auto guard = batchConcurrencyLimiter_.lock(0);
    auto results = co_await folly::coro::collectAllRange(std::move(batch));
    for (auto &result : results) {
      if (UNLIKELY(!result)) {
        XLOGF(ERR, "target {} forward write failed {}", targetId, result.error());
        CO_RETURN_ERROR(result);
      }
    }
    remainingCount -= results.size();
    remainingChunksCount->set(remainingCount);
  }

  // 4. sync done.
  {
    SyncDoneReq syncDoneReq;
    syncDoneReq.vChainId = vChainId;

    auto addrResult = target->getSuccessorAddr();
    if (UNLIKELY(!addrResult)) {
      XLOGF(ERR, "sync start get successor addr error: {}", addrResult.error());
      co_return makeError(std::move(addrResult.error()));
    }
    auto syncDoneResult = co_await components_.messenger.syncDone(*addrResult, syncDoneReq);
    if (UNLIKELY(!syncDoneResult)) {
      auto msg = fmt::format("sync done {} request failed: {}", vChainId, syncDoneResult.error());
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kSyncSendDoneFailed, std::move(msg));
    }
    if (UNLIKELY(!syncDoneResult->result.lengthInfo)) {
      auto msg = fmt::format("sync done {} request failed: {}", vChainId, syncDoneResult->result.lengthInfo.error());
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kSyncSendDoneFailed, std::move(msg));
    }
  }

  recordGuard.succ();
  XLOGF(INFO,
        "sync done chain {} target {} update {} remove {}",
        vChainId,
        targetId,
        writeList.size(),
        removeList.size());
  co_return Void{};
}