in src/storage/store/ChunkReplica.cc [132:317]
Result<uint32_t> ChunkReplica::update(ChunkStore &store, UpdateJob &job, folly::CPUThreadPoolExecutor &executor) {
auto recordGuard = storageUpdateRecorder.record();
const auto &writeIO = job.updateIO();
const auto &options = job.options();
const auto &chunkId = writeIO.key.chunkId;
const auto &state = job.state();
auto &result = job.result();
if (UNLIKELY(!writeIO.isRemove() &&
(writeIO.offset >= writeIO.chunkSize || writeIO.offset + writeIO.length > writeIO.chunkSize))) {
auto msg = fmt::format("chunk {} write offset exceed chunk size {}", chunkId, writeIO);
XLOG(ERR, msg);
return makeError(StatusCode::kInvalidArg, std::move(msg));
}
// 1. get meta info.
ChunkInfo chunkInfo;
bool needCreateChunk = false;
auto metaResult = store.get(chunkId);
if (metaResult) {
chunkInfo = (*metaResult)->second;
} else if (metaResult.error().code() == StorageCode::kChunkMetadataNotFound) {
if (writeIO.isRemove()) {
result.commitVer = result.updateVer = writeIO.updateVer;
result.commitChainVer = job.commitChainVer();
return 0;
} else {
needCreateChunk = true;
chunkInfo.meta.chainVer = job.commitChainVer();
chunkInfo.meta.chunkState = ChunkState::CLEAN;
chunkInfo.meta.innerFileId.chunkSize = writeIO.chunkSize;
}
} else {
RETURN_AND_LOG_ON_ERROR(metaResult);
}
ChunkMetadata &meta = chunkInfo.meta;
// 2. begin to write.
auto chunkSize = writeIO.isRemove() ? meta.innerFileId.chunkSize : writeIO.chunkSize;
result.commitVer = meta.commitVer;
result.updateVer = meta.updateVer;
result.checksum = meta.checksum();
result.commitChainVer = meta.chainVer;
if (UNLIKELY(meta.innerFileId.chunkSize != chunkSize)) {
auto msg = fmt::format("chunk {} {} chunk size mismatch {}", chunkId, meta, chunkSize);
XLOG(ERR, msg);
return makeError(StorageCode::kChunkSizeMismatch, std::move(msg));
}
if (UNLIKELY(meta.chunkState == ChunkState::DIRTY && !options.isSyncing)) {
auto msg = fmt::format("chunk {} {} state not valid", chunkId, meta);
XLOG(ERR, msg);
return makeError(StorageCode::kChunkNotClean, fmt::format("chunk {} {} state not valid", chunkId, meta));
}
if (job.commitChainVer() < meta.chainVer && meta.chunkState == ChunkState::COMMIT) {
auto msg = fmt::format("chunk {} {} chain version mismatch {} {}", chunkId, meta, writeIO, options);
reportFatalEvent();
XLOG(DFATAL, msg);
return makeError(StorageCode::kChainVersionMismatch, std::move(msg));
}
if (writeIO.checksum.type != ChecksumType::NONE && writeIO.length != 0) {
auto checksum = ChecksumInfo::create(writeIO.checksum.type, state.data, writeIO.length);
if (checksum != writeIO.checksum) {
if (!job.requestCtx().debugFlags.faultInjectionEnabled()) {
reportFatalEvent();
}
XLOGF_IF(DFATAL,
!job.requestCtx().debugFlags.faultInjectionEnabled(),
"Local checksum {} not equal to checksum {} generated by client, write io: {}",
checksum,
writeIO.checksum,
writeIO);
return makeError(StorageCode::kChecksumMismatch);
}
}
XLOGF(DBG, "chunk {} {} write begin", chunkId, meta);
if (options.isSyncing) {
XLOGF(DBG9, "chunk {} {} sync write: {}", chunkId, meta, writeIO);
meta.updateVer = writeIO.updateVer;
meta.commitVer = ChunkVer{writeIO.updateVer - 1};
meta.recycleState = RecycleState::NORMAL;
} else if (writeIO.updateVer > 0) {
if (writeIO.updateVer <= meta.commitVer) {
auto msg = fmt::format("chunk {} {} committed update {} <= {}", chunkId, meta, writeIO.updateVer, meta.commitVer);
XLOG(ERR, msg);
return makeError(StorageCode::kChunkCommittedUpdate, std::move(msg));
} else if (writeIO.updateVer <= meta.updateVer) {
auto msg = fmt::format("chunk {} {} stale update {} <= {}", chunkId, meta, writeIO.updateVer, meta.updateVer);
XLOG(ERR, msg);
return makeError(StorageCode::kChunkStaleUpdate, std::move(msg));
} else if (writeIO.updateVer > meta.updateVer + 1) {
auto msg =
fmt::format("chunk {} {} missing update {} > {} + 1", chunkId, meta, writeIO.updateVer, meta.updateVer);
XLOG(ERR, msg);
return makeError(StorageCode::kChunkMissingUpdate, std::move(msg));
}
meta.updateVer = writeIO.updateVer;
} else {
meta.updateVer += 1;
if (meta.updateVer > meta.commitVer + 1) {
auto msg = fmt::format("chunk {} {} advance update {}", chunkId, meta, writeIO);
XLOG(ERR, msg);
return makeError(StorageCode::kChunkAdvanceUpdate, std::move(msg));
}
}
meta.chunkState = ChunkState::DIRTY;
meta.chainVer = job.commitChainVer();
meta.lastRequestId = job.requestCtx().tag.requestId;
meta.lastClientUuid = job.requestCtx().tag.clientId.uuid;
meta.timestamp = UtcClock::now();
const bool isAppendWrite = writeIO.offset == meta.size;
const bool skipPersist = (writeIO.isWrite() && isAppendWrite) || writeIO.isTruncate() || writeIO.isExtend();
auto setResult = needCreateChunk ? store.createChunk(chunkId, chunkSize, chunkInfo, executor, job.allowToAllocate())
: store.set(chunkId, chunkInfo, !skipPersist);
if (UNLIKELY(!setResult)) {
return makeError(std::move(setResult.error()));
}
result.commitChainVer = meta.chainVer;
result.updateVer = meta.updateVer;
uint32_t chunkSizeBeforeWrite = meta.size;
// 3. do write operation.
Result<uint32_t> writeResult = 0;
if (writeIO.isTruncate() || writeIO.isExtend()) {
if (writeIO.length <= meta.size) {
if (writeIO.isTruncate()) {
writeResult = (meta.size = writeIO.length);
} else {
writeResult = meta.size;
}
} else {
// extend the chunk (fill zeros)
writeResult = doRealWrite(chunkId, chunkInfo, kZeroBytes.data(), writeIO.length - meta.size, meta.size);
if (writeResult) {
writeResult = meta.size; // set result to the actual chunk length if write succeeds
}
}
} else if (writeIO.isRemove()) {
// remove.
if (!meta.readyToRemove()) {
meta.recycleState = RecycleState::REMOVAL_IN_PROGRESS;
}
writeResult = 0;
} else {
// fill zeros before the write range if there is a gap
if (meta.size < writeIO.offset) {
RETURN_AND_LOG_ON_ERROR(chunkInfo.view.write(kZeroBytes.data(), writeIO.offset - meta.size, meta.size, meta));
}
// normal write.
writeResult = doRealWrite(chunkId, chunkInfo, state.data, writeIO.length, writeIO.offset);
if (writeResult) {
if (options.isSyncing) meta.size = writeIO.length;
storageUpdateSeqWrite.addSample(writeIO.offset == chunkSizeBeforeWrite);
}
}
if (UNLIKELY(!writeResult)) {
return writeResult; // chunk becomes dirty.
}
// update chunk checksum
auto checksumRes = updateChecksum(chunkInfo, writeIO, chunkSizeBeforeWrite, isAppendWrite);
if (UNLIKELY(!checksumRes)) {
return makeError(std::move(checksumRes.error()));
}
// 4. finish to write.
meta.chunkState = ChunkState::CLEAN;
XLOGF(DBG, "chunk {} {} write finish", chunkId, meta);
setResult = store.set(chunkId, chunkInfo, !skipPersist);
if (UNLIKELY(!setResult)) {
return makeError(std::move(setResult.error()));
}
result.checksum = meta.checksum();
result.commitVer = meta.commitVer;
result.commitChainVer = meta.chainVer;
recordGuard.succ();
return writeResult;
}