in src/storage/sync/ResyncWorker.cc [101:387]
CoTryTask<void> ResyncWorker::handleSync(VersionedChainId vChainId) {
auto fullSyncLevel = config_.full_sync_level();
auto needFullSync = fullSyncLevel != FullSyncLevel::NONE &&
(config_.full_sync_chains().empty() || config_.full_sync_chains().contains(vChainId.chainId));
bool heavyFullSync = needFullSync && fullSyncLevel == FullSyncLevel::HEAVY;
// 1. Cancel the syncing state on exit.
auto guard = folly::makeGuard([&] {
shards_.withLock(
[&](SyncingChainIds &syncingChainIds) {
XLOGF(DBG9, "sync exit chain {}", vChainId);
auto &status = syncingChainIds[vChainId.chainId];
status.isSyncing = false;
status.lastSyncingTime = RelativeTime::now();
},
vChainId.chainId);
});
XLOGF(DBG9, "start sync chain {}", vChainId);
// 2. find target and routing.
auto targetResult = components_.targetMap.getByChainId(vChainId);
if (UNLIKELY(!targetResult)) {
auto msg = fmt::format("sync start {} get routing failed: {}", vChainId, targetResult.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
}
auto target = std::move(*targetResult);
auto targetId = target->targetId;
ClientId clientId{};
static_assert(sizeof(ClientId::uuid) == sizeof(VersionedChainId) + sizeof(TargetId));
*reinterpret_cast<VersionedChainId *>(clientId.uuid.data) = vChainId;
*reinterpret_cast<TargetId *>(clientId.uuid.data + sizeof(VersionedChainId)) = targetId;
monitor::TagSet tag;
tag.addTag("instance", fmt::format("{}-{}", targetId, vChainId.chainVer));
uint32_t currentSyncingRemoteMissCount = 0;
uint32_t currentSyncingRemoteChainVersionLowCount = 0;
uint32_t currentSyncingRemoteChainVersionHighCount = 0;
uint32_t currentSyncingRemoteUncommittedCount = 0;
uint32_t currentSyncingLocalUncommittedCount = 0;
uint32_t currentSyncingCommitVersionMismatchCount = 0;
uint32_t currentSyncingCurrentChainIsWritingCount = 0;
uint32_t currentSyncingRemoteFullSyncHeavyCount = 0;
uint32_t currentSyncingRemoteFullSyncLightCount = 0;
uint32_t currentSyncingSkipCount = 0;
auto recordGuard = resyncRecorder.record(tag);
auto remainingChunksCount = syncingRemainingChunksCount.getRecorderWithTag(tag);
SCOPE_EXIT { remainingChunksCount->set(0); };
// 3. sync start.
net::UserRequestOptions options;
options.timeout = config_.sync_start_timeout();
std::vector<ChunkMeta> remoteMetas;
auto addrResult = target->getSuccessorAddr();
if (UNLIKELY(!addrResult)) {
XLOGF(ERR, "sync start get successor addr error: {}", addrResult.error());
co_return makeError(std::move(addrResult.error()));
}
{
SyncStartReq syncStartReq;
syncStartReq.vChainId = vChainId;
auto syncStartResult = co_await components_.messenger.syncStart(*addrResult, syncStartReq, &options);
if (UNLIKELY(!syncStartResult)) {
if (syncStartResult.error().code() == StorageClientCode::kRoutingVersionMismatch) {
recordGuard.dismiss();
resyncRoutingVersionMismatch.addSample(1);
auto msg = fmt::format("sync start {} request failed: {}", vChainId, syncStartResult.error());
XLOG(DBG9, msg);
co_return makeError(std::move(syncStartResult.error()));
}
auto msg = fmt::format("sync start {} request failed: {}", vChainId, syncStartResult.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
}
remoteMetas = std::move(syncStartResult->metas);
}
// 3. syncing.
std::unordered_map<ChunkId, ChunkMetadata> localMetas;
auto result = target->storageTarget->getAllMetadataMap(localMetas);
if (UNLIKELY(!result)) {
XLOGF(ERR, "target invalid iterator {}, error {}", targetId, result.error());
co_return makeError(std::move(result.error()));
}
// re-check current chain version.
{
auto targetResult = components_.targetMap.getByChainId(vChainId);
if (UNLIKELY(!targetResult)) {
auto msg = fmt::format("sync re-check {} get routing failed: {}", vChainId, targetResult.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
}
}
std::vector<std::pair<ChunkId, uint32_t>> writeList;
std::vector<ChunkId> removeList;
bool hasFatalEvents = false;
for (auto &remoteMeta : remoteMetas) {
// 1. check exists.
auto it = localMetas.find(remoteMeta.chunkId);
if (it == localMetas.end()) {
removeList.push_back(remoteMeta.chunkId);
continue;
}
SCOPE_EXIT { localMetas.erase(it); };
// 2. check recycle state.
const auto &chunkId = it->first;
const auto &meta = it->second;
if (UNLIKELY(meta.recycleState != RecycleState::NORMAL)) {
XLOGF(WARNING, "target {} chunk {} in recycle state: {}", targetId, chunkId, meta);
syncingLocalChunkInRecycleState.addSample(1);
continue; // skip chunk in recycle state.
}
// 3. handle updated write (local == remote).
bool needForward = true;
if (meta.chainVer > remoteMeta.chainVer) {
++currentSyncingRemoteChainVersionLowCount;
} else if (remoteMeta.updateVer != remoteMeta.commitVer || remoteMeta.chunkState != ChunkState::COMMIT) {
XLOGF(WARNING, "chain {} remote uncommitted {}", vChainId.chainId, remoteMeta);
++currentSyncingRemoteUncommittedCount;
} else if (meta.chainVer < remoteMeta.chainVer) {
if (meta.chunkState == ChunkState::COMMIT) {
++currentSyncingRemoteChainVersionHighCount;
XLOGF(DFATAL, "chain {} remote chain version high, local {}, remote {}", vChainId, meta, remoteMeta);
hasFatalEvents = true;
break;
} else {
needForward = false;
++currentSyncingLocalUncommittedCount;
XLOGF(CRITICAL, "chain {} local uncommitted, local {}, remote {}", vChainId, meta, remoteMeta);
}
} else if (meta.updateVer != remoteMeta.commitVer) {
if (meta.chainVer != vChainId.chainVer && meta.chunkState == ChunkState::COMMIT) {
++currentSyncingCommitVersionMismatchCount;
XLOGF(DFATAL, "chain {} commit version mismatch, local {}, remote {}", vChainId, meta, remoteMeta);
hasFatalEvents = true;
break;
} else {
needForward = false;
++currentSyncingCurrentChainIsWritingCount;
XLOGF(CRITICAL, "chain {} chain is writing, local {}, remote {}", vChainId, meta, remoteMeta);
}
} else if (heavyFullSync) {
++currentSyncingRemoteFullSyncHeavyCount;
} else if (meta.checksum() != remoteMeta.checksum) {
if (meta.chainVer != vChainId.chainVer) {
XLOGF(DFATAL, "chain {} checksum not equal, local {}, remote {}", vChainId, meta, remoteMeta);
++currentSyncingRemoteFullSyncLightCount;
hasFatalEvents = true;
break;
} else {
needForward = false;
++currentSyncingCurrentChainIsWritingCount;
XLOGF(CRITICAL,
"chain {} checksum not equal because of writing, local {}, remote {}",
vChainId,
meta,
remoteMeta);
}
} else {
needForward = false;
}
if (needForward) {
writeList.emplace_back(chunkId, meta.innerFileId.chunkSize);
} else {
++currentSyncingSkipCount;
}
}
if (UNLIKELY(hasFatalEvents)) {
auto msg = fmt::format("sync {} has fatal events", vChainId);
XLOG(CRITICAL, msg);
OfflineTargetReq req;
req.targetId = target->successor->targetInfo.targetId;
req.force = true;
CO_RETURN_AND_LOG_ON_ERROR(co_await components_.messenger.offlineTarget(*addrResult, req, &options));
co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
}
for (auto &[chunkId, meta] : localMetas) {
writeList.emplace_back(chunkId, meta.innerFileId.chunkSize);
++currentSyncingRemoteMissCount;
}
syncingRemoteMissCount.addSample(currentSyncingRemoteMissCount, tag);
syncingRemoteChainVersionLowCount.addSample(currentSyncingRemoteChainVersionLowCount, tag);
syncingRemoteChainVersionHighCount.addSample(currentSyncingRemoteChainVersionHighCount, tag);
syncingLocalUncommittedCount.addSample(currentSyncingLocalUncommittedCount, tag);
syncingRemoteUncommittedCount.addSample(currentSyncingRemoteUncommittedCount, tag);
syncingCommitVersionMismatchCount.addSample(currentSyncingCommitVersionMismatchCount, tag);
syncingCurrentChainIsWritingCount.addSample(currentSyncingCurrentChainIsWritingCount, tag);
syncingRemoteFullSyncHeavyCount.addSample(currentSyncingRemoteFullSyncHeavyCount, tag);
syncingRemoteFullSyncLightCount.addSample(currentSyncingRemoteFullSyncLightCount, tag);
syncingSkipCount.addSample(currentSyncingSkipCount, tag);
auto batchSize = config_.batch_size();
auto remainingCount = writeList.size() + removeList.size();
remainingChunksCount->set(remainingCount);
for (auto batchStart = 0ul; batchStart < removeList.size(); batchStart += batchSize) {
auto targetResult = components_.targetMap.getByChainId(vChainId);
if (UNLIKELY(!targetResult)) {
auto msg = fmt::format("sync re-check {} get routing failed: {}", vChainId, targetResult.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
}
target = std::move(*targetResult);
std::vector<CoTryTask<void>> batch;
for (auto idx = batchStart; idx < removeList.size() && idx < batchStart + batchSize; ++idx) {
batch.push_back(forward(target, tag, clientId, std::move(removeList[idx]), UpdateType::REMOVE, 0));
}
auto guard = batchConcurrencyLimiter_.lock(0);
auto results = co_await folly::coro::collectAllRange(std::move(batch));
for (auto &result : results) {
if (UNLIKELY(!result)) {
XLOGF(ERR, "target {} forward remove failed {}", targetId, result.error());
CO_RETURN_ERROR(result);
}
}
remainingCount -= results.size();
remainingChunksCount->set(remainingCount);
}
for (auto batchStart = 0ul; batchStart < writeList.size(); batchStart += batchSize) {
auto targetResult = components_.targetMap.getByChainId(vChainId);
if (UNLIKELY(!targetResult)) {
auto msg = fmt::format("sync re-check {} get routing failed: {}", vChainId, targetResult.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendStartFailed, std::move(msg));
}
target = std::move(*targetResult);
std::vector<CoTryTask<void>> batch;
for (auto idx = batchStart; idx < writeList.size() && idx < batchStart + batchSize; ++idx) {
auto &[chunkId, chunkSize] = writeList[idx];
batch.push_back(forward(target, tag, clientId, std::move(chunkId), UpdateType::WRITE, chunkSize));
}
auto guard = batchConcurrencyLimiter_.lock(0);
auto results = co_await folly::coro::collectAllRange(std::move(batch));
for (auto &result : results) {
if (UNLIKELY(!result)) {
XLOGF(ERR, "target {} forward write failed {}", targetId, result.error());
CO_RETURN_ERROR(result);
}
}
remainingCount -= results.size();
remainingChunksCount->set(remainingCount);
}
// 4. sync done.
{
SyncDoneReq syncDoneReq;
syncDoneReq.vChainId = vChainId;
auto addrResult = target->getSuccessorAddr();
if (UNLIKELY(!addrResult)) {
XLOGF(ERR, "sync start get successor addr error: {}", addrResult.error());
co_return makeError(std::move(addrResult.error()));
}
auto syncDoneResult = co_await components_.messenger.syncDone(*addrResult, syncDoneReq);
if (UNLIKELY(!syncDoneResult)) {
auto msg = fmt::format("sync done {} request failed: {}", vChainId, syncDoneResult.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendDoneFailed, std::move(msg));
}
if (UNLIKELY(!syncDoneResult->result.lengthInfo)) {
auto msg = fmt::format("sync done {} request failed: {}", vChainId, syncDoneResult->result.lengthInfo.error());
XLOG(ERR, msg);
co_return makeError(StorageCode::kSyncSendDoneFailed, std::move(msg));
}
}
recordGuard.succ();
XLOGF(INFO,
"sync done chain {} target {} update {} remove {}",
vChainId,
targetId,
writeList.size(),
removeList.size());
co_return Void{};
}