in src/storage/service/ReliableForwarding.cc [138:279]
CoTask<IOResult> ReliableForwarding::doForward(const UpdateReq &req,
const net::RDMARemoteBuf &rdmabuf,
const ChunkEngineUpdateJob &chunkEngineJob,
uint32_t retryCount,
const Target &target,
bool &isSyncing,
std::chrono::milliseconds timeout) {
UpdateReq updateReq = req;
updateReq.options.fromClient = false;
updateReq.retryCount = retryCount;
updateReq.payload.rdmabuf = rdmabuf;
updateReq.payload.key.vChainId.chainVer = target.vChainId.chainVer;
auto buffer = components_.rdmabufPool.get();
isSyncing = target.successor->targetInfo.publicState == hf3fs::flat::PublicTargetState::SYNCING;
if (isSyncing) {
updateReq.options.isSyncing = true;
updateReq.options.commitChainVer = target.vChainId.chainVer;
}
bool readForSyncing = req.payload.isWriteTruncateExtend() && isSyncing &&
(req.options.isSyncing || req.payload.length != req.payload.chunkSize);
if (readForSyncing) {
auto recordGuard = syncingReadRecorder.record();
// read the entire chunk.
IOResult readResult;
auto allocateResult = buffer.tryAllocate(req.payload.chunkSize);
if (UNLIKELY(!allocateResult)) {
allocateResult = co_await buffer.allocate(req.payload.chunkSize);
}
if (UNLIKELY(!allocateResult)) {
readResult.lengthInfo = makeError(std::move(allocateResult.error()));
co_return readResult;
}
auto &readBuf = *allocateResult;
ReadIO payload;
payload.key = updateReq.payload.key;
payload.offset = 0;
payload.length = req.payload.chunkSize;
BatchReadJob batch(payload, target.storageTarget.get(), readResult, req.payload.checksum.type);
batch.setRecalculateChecksum();
batch.front().state().localbuf = readBuf;
batch.front().state().bufferIndex = buffer.index();
batch.front().state().readUncommitted = true;
if (chunkEngineJob.chunk()) {
batch.front().state().chunkEngineJob.set(nullptr, chunkEngineJob.chunk()->raw_chunk());
}
co_await components_.aioReadWorker.enqueue(&batch);
co_await batch.complete();
CO_RETURN_ON_ERROR(readResult.lengthInfo); // OK.
// clear the inline data if the update is built from full chunk read
if (BITFLAGS_CONTAIN(updateReq.featureFlags, FeatureFlags::SEND_DATA_INLINE)) {
BITFLAGS_CLEAR(updateReq.featureFlags, FeatureFlags::SEND_DATA_INLINE);
updateReq.payload.inlinebuf.data.clear();
}
auto length = *readResult.lengthInfo;
updateReq.payload.updateVer = readResult.updateVer;
if (req.options.isSyncing) {
updateReq.options.commitChainVer = batch.front().result().commitChainVer;
}
updateReq.payload.offset = 0;
updateReq.payload.length = length;
updateReq.payload.rdmabuf = readBuf.first(length).toRemoteBuf();
updateReq.payload.checksum = batch.front().state().chunkChecksum;
updateReq.payload.updateType = UpdateType::WRITE;
if (length <= config_.max_inline_forward_bytes()) {
updateReq.payload.inlinebuf.data.assign(readBuf.ptr(), readBuf.ptr() + length);
BITFLAGS_SET(updateReq.featureFlags, hf3fs::storage::FeatureFlags::SEND_DATA_INLINE);
}
recordGuard.succ();
} else if (isSyncing && !req.payload.isRemove() && chunkEngineJob.chunk() == nullptr) {
auto chunkResult = target.storageTarget->queryChunk(req.payload.key.chunkId);
if (UNLIKELY(!chunkResult)) {
XLOGF(ERR, "forward query chunk failed, req {}, error {}", updateReq, chunkResult.error());
co_return makeError(std::move(chunkResult.error()));
}
updateReq.payload.updateVer = chunkResult->updateVer;
}
auto recordGuard = updateRemoteRecorder.record();
auto addrResult = target.getSuccessorAddr();
if (UNLIKELY(!addrResult)) {
XLOGF(ERR, "target forward addr invalid, target {}", target);
co_return makeError(std::move(addrResult.error()));
}
net::UserRequestOptions reqOptions;
reqOptions.timeout = Duration{timeout};
auto updateResult = co_await components_.messenger.update(*addrResult, updateReq, &reqOptions);
if (UNLIKELY(!updateResult)) {
XLOGF(ERR, "forward timeout, req {}, result {}", updateReq, updateResult);
co_return makeError(std::move(updateResult.error()));
}
if (LIKELY(bool(updateResult->result.lengthInfo))) {
if (target.vChainId.chainVer < updateResult->result.commitChainVer) {
auto msg = fmt::format("chain version local < remote, req {} local {} remote {}",
updateReq,
target,
updateResult->result);
XLOG(ERR, msg);
co_return makeError(StorageCode::kChainVersionMismatch, std::move(msg));
}
auto length = *updateResult->result.lengthInfo;
monitor::TagSet tag;
tag.addTag("instance", fmt::format("{}", target.targetId));
if (isSyncing) {
updateResult->result.updateVer = req.payload.updateVer;
forwardSyncingBytes.addSample(length, tag);
forwardSyncingDist.addSample(length, tag);
} else {
forwardWriteBytes.addSample(length, tag);
forwardWriteDist.addSample(length, tag);
}
recordGuard.succ();
} else {
XLOGF(ERR, "forward failed, req {}, result {}", updateReq, updateResult->result);
auto errorCode = updateResult->result.lengthInfo.error().code();
if (errorCode == StorageCode::kChecksumMismatch) {
auto reqChecksum = updateReq.payload.checksum;
auto realChecksum = ChecksumInfo::create(reqChecksum.type,
(const uint8_t *)updateReq.payload.rdmabuf.addr(),
updateReq.payload.length);
if (reqChecksum != realChecksum) {
XLOGF(DFATAL,
"local rdma buffer is corrupted local {} != client {}, req: {}, kill self...",
realChecksum,
reqChecksum,
req);
ApplicationBase::handleSignal(SIGUSR2);
}
}
}
co_return updateResult->result;
}