in src/storage/service/StorageOperator.cc [333:514]
CoTask<IOResult> StorageOperator::handleUpdate(ServiceRequestContext &requestCtx,
UpdateReq &req,
net::IBSocket *ibSocket,
TargetPtr &target) {
// 1. get target.
if (UNLIKELY(req.options.fromClient && !target->isHead)) {
XLOGF(ERR, "non-head node receive a client update request");
co_return makeError(StorageClientCode::kRoutingError, "non-head node receive a client update request");
}
if (UNLIKELY(req.options.fromClient && config_.read_only())) {
auto msg = fmt::format("storage is readonly!");
XLOG(ERR, msg);
co_return makeError(StatusCode::kReadOnlyMode, std::move(msg));
}
if (UNLIKELY(req.payload.key.chunkId.data().empty())) {
auto msg = fmt::format("update request with empty chunk id: {}", req);
XLOG(ERR, msg);
co_return makeError(StatusCode::kInvalidArg, std::move(msg));
}
XLOGF(DBG1, "Start the replication process, target: {}, tag: {}, req: {}", target->targetId, req.tag, req);
const auto &appInfo = components_.getAppInfo();
auto trace = storageEventTrace_.newEntry(StorageEventTrace{
.clusterId = appInfo.clusterId,
.nodeId = appInfo.nodeId,
.targetId = target->targetId,
.updateReq = req,
});
// 2. lock chunk.
folly::coro::Baton baton;
auto recordGuard = waitChunkLockRecorder.record();
auto lockGuard = target->storageTarget->lockChunk(baton, req.payload.key.chunkId, fmt::to_string(req.tag));
if (!lockGuard.locked()) {
XLOGF(DBG1,
"write wait lock on chunk {}, current owner: {}, req: {}",
req.payload.key.chunkId,
lockGuard.currentTag(),
req);
co_await lockGuard.lock();
}
recordGuard.report(true);
// re-check chain version after acquiring the lock.
auto targetResult = components_.targetMap.getByChainId(req.payload.key.vChainId);
if (UNLIKELY(!targetResult)) {
co_return makeError(std::move(targetResult.error()));
}
target = std::move(*targetResult);
ChunkEngineUpdateJob chunkEngineJob{};
// 3. update local target.
auto buffer = components_.rdmabufPool.get();
net::RDMARemoteBuf remoteBuf;
auto updateResult = co_await doUpdate(requestCtx,
req.payload,
req.options,
req.featureFlags,
target->storageTarget,
ibSocket,
buffer,
remoteBuf,
chunkEngineJob,
!(req.options.fromClient && target->rejectCreateChunk));
trace->updateRes = updateResult;
uint32_t code = updateResult.lengthInfo ? 0 : updateResult.lengthInfo.error().code();
if (code == 0) {
// 1. write success.
if (req.payload.updateVer == 0) {
req.payload.updateVer = updateResult.updateVer;
} else if (UNLIKELY(req.payload.updateVer != updateResult.updateVer)) {
auto msg = fmt::format("write update version mismatch, req {}, result {}", req, updateResult);
XLOG(DFATAL, msg);
co_return makeError(StorageCode::kChunkVersionMismatch, std::move(msg));
}
} else if (code == StorageCode::kChunkMissingUpdate) {
// 2. missing update.
XLOGF(DFATAL, "write missing update and block, req {}, result {}", req, updateResult);
co_return updateResult;
} else if (code == StorageCode::kChunkCommittedUpdate) {
// 3. committed update, considered as a successful write.
updateResult.lengthInfo = req.payload.length;
updateResult.updateVer = req.payload.updateVer;
updateResult.commitVer = req.payload.updateVer;
XLOGF(DFATAL, "write committed update, req {}, result {}", req, updateResult);
co_return updateResult;
} else if (code == StorageCode::kChunkStaleUpdate) {
// 3. stale update, considered as a successful write.
updateResult.lengthInfo = req.payload.length;
updateResult.updateVer = req.payload.updateVer;
XLOGF(CRITICAL, "write stale update, req {}, result {}", req, updateResult);
} else if (code == StorageCode::kChunkAdvanceUpdate) {
// 4. advance update.
XLOGF(DFATAL, "write advance update, req {}, result {}", req, updateResult);
co_return updateResult;
} else {
XLOGF(CRITICAL, "write update failed, req {}, result {}", req, updateResult);
co_return updateResult;
}
XLOGF(DBG1, "Updated local chunk, target: {}, tag: {}, result: {}", target->targetId, req.tag, updateResult);
// 4. forward to successor.
CommitIO commitIO;
commitIO.key = req.payload.key;
commitIO.commitVer = updateResult.updateVer;
commitIO.isRemove = req.payload.isRemove();
auto forwardResult = co_await components_.reliableForwarding
.forwardWithRetry(requestCtx, req, remoteBuf, chunkEngineJob, target, commitIO);
if (UNLIKELY(commitIO.commitVer != updateResult.updateVer)) {
auto msg = fmt::format("commit version mismatch, req: {}, successor {} != local {}",
req,
commitIO.commitVer,
updateResult.updateVer);
XLOG(DFATAL, msg);
co_return makeError(StorageCode::kChunkVersionMismatch, std::move(msg));
}
XLOGF(DBG1,
"Forwarded update to successor {}, target: {}, tag: {}, result: {}",
(target->successor ? target->successor->targetInfo.targetId : TargetId{0}),
target->targetId,
req.tag,
forwardResult);
trace->forwardRes = forwardResult;
trace->commitIO = commitIO;
if (forwardResult.lengthInfo) {
if (commitIO.isRemove && (forwardResult.checksum.type == ChecksumType::NONE ||
updateResult.checksum.type == ChecksumType::NONE || commitIO.isSyncing)) {
// The known issue is that during the delete operation, it is possible for one side to encounter a "chunk not
// found" situation.
XLOGF(INFO,
"Remove op local checksum {} not equal to checksum {} generated by successor, key: {}, syncing: {}",
updateResult.checksum,
forwardResult.checksum,
req.payload.key,
commitIO.isSyncing);
} else if (forwardResult.checksum != updateResult.checksum) {
auto msg = fmt::format("Local checksum {} not equal to checksum {} generated by successor, key: {}",
updateResult.checksum,
forwardResult.checksum,
req.payload.key);
XLOG_IF(DFATAL, !requestCtx.debugFlags.faultInjectionEnabled(), msg);
co_return makeError(StorageClientCode::kChecksumMismatch, std::move(msg));
}
} else if (forwardResult.lengthInfo.error().code() != StorageCode::kNoSuccessorTarget) {
co_return forwardResult;
}
// 5. commit.
auto commitResult =
co_await doCommit(requestCtx, commitIO, req.options, chunkEngineJob, req.featureFlags, target->storageTarget);
code = commitResult.lengthInfo ? 0 : commitResult.lengthInfo.error().code();
if (LIKELY(code == 0)) {
// 1. commit success.
} else if (code == StorageCode::kChunkStaleCommit) {
// 2. stale commit, considered as a successful commit.
XLOGF(INFO, "write stale commit, req {}, result {}", req, commitResult);
commitResult.commitVer = updateResult.updateVer;
} else {
// 3. commit fail.
XLOGF(ERR, "write commit fail, req {}, result {}", req, commitResult);
co_return commitResult;
}
commitResult.lengthInfo = updateResult.lengthInfo;
commitResult.checksum = updateResult.checksum;
XLOGF(DBG1, "Committed local chunk, target: {}, tag: {}, result: {}", target->targetId, req.tag, commitResult);
trace->commitRes = commitResult;
// storageEventTrace_.append(const StorageEventTrace &obj)
co_return commitResult;
}