Result ChunkReplica::update()

in src/storage/store/ChunkReplica.cc [132:317]


Result<uint32_t> ChunkReplica::update(ChunkStore &store, UpdateJob &job, folly::CPUThreadPoolExecutor &executor) {
  auto recordGuard = storageUpdateRecorder.record();

  const auto &writeIO = job.updateIO();
  const auto &options = job.options();
  const auto &chunkId = writeIO.key.chunkId;
  const auto &state = job.state();
  auto &result = job.result();

  if (UNLIKELY(!writeIO.isRemove() &&
               (writeIO.offset >= writeIO.chunkSize || writeIO.offset + writeIO.length > writeIO.chunkSize))) {
    auto msg = fmt::format("chunk {} write offset exceed chunk size {}", chunkId, writeIO);
    XLOG(ERR, msg);
    return makeError(StatusCode::kInvalidArg, std::move(msg));
  }

  // 1. get meta info.
  ChunkInfo chunkInfo;
  bool needCreateChunk = false;
  auto metaResult = store.get(chunkId);
  if (metaResult) {
    chunkInfo = (*metaResult)->second;
  } else if (metaResult.error().code() == StorageCode::kChunkMetadataNotFound) {
    if (writeIO.isRemove()) {
      result.commitVer = result.updateVer = writeIO.updateVer;
      result.commitChainVer = job.commitChainVer();
      return 0;
    } else {
      needCreateChunk = true;
      chunkInfo.meta.chainVer = job.commitChainVer();
      chunkInfo.meta.chunkState = ChunkState::CLEAN;
      chunkInfo.meta.innerFileId.chunkSize = writeIO.chunkSize;
    }
  } else {
    RETURN_AND_LOG_ON_ERROR(metaResult);
  }
  ChunkMetadata &meta = chunkInfo.meta;

  // 2. begin to write.
  auto chunkSize = writeIO.isRemove() ? meta.innerFileId.chunkSize : writeIO.chunkSize;
  result.commitVer = meta.commitVer;
  result.updateVer = meta.updateVer;
  result.checksum = meta.checksum();
  result.commitChainVer = meta.chainVer;
  if (UNLIKELY(meta.innerFileId.chunkSize != chunkSize)) {
    auto msg = fmt::format("chunk {} {} chunk size mismatch {}", chunkId, meta, chunkSize);
    XLOG(ERR, msg);
    return makeError(StorageCode::kChunkSizeMismatch, std::move(msg));
  }
  if (UNLIKELY(meta.chunkState == ChunkState::DIRTY && !options.isSyncing)) {
    auto msg = fmt::format("chunk {} {} state not valid", chunkId, meta);
    XLOG(ERR, msg);
    return makeError(StorageCode::kChunkNotClean, fmt::format("chunk {} {} state not valid", chunkId, meta));
  }
  if (job.commitChainVer() < meta.chainVer && meta.chunkState == ChunkState::COMMIT) {
    auto msg = fmt::format("chunk {} {} chain version mismatch {} {}", chunkId, meta, writeIO, options);
    reportFatalEvent();
    XLOG(DFATAL, msg);
    return makeError(StorageCode::kChainVersionMismatch, std::move(msg));
  }

  if (writeIO.checksum.type != ChecksumType::NONE && writeIO.length != 0) {
    auto checksum = ChecksumInfo::create(writeIO.checksum.type, state.data, writeIO.length);
    if (checksum != writeIO.checksum) {
      if (!job.requestCtx().debugFlags.faultInjectionEnabled()) {
        reportFatalEvent();
      }
      XLOGF_IF(DFATAL,
               !job.requestCtx().debugFlags.faultInjectionEnabled(),
               "Local checksum {} not equal to checksum {} generated by client, write io: {}",
               checksum,
               writeIO.checksum,
               writeIO);
      return makeError(StorageCode::kChecksumMismatch);
    }
  }

  XLOGF(DBG, "chunk {} {} write begin", chunkId, meta);

  if (options.isSyncing) {
    XLOGF(DBG9, "chunk {} {} sync write: {}", chunkId, meta, writeIO);
    meta.updateVer = writeIO.updateVer;
    meta.commitVer = ChunkVer{writeIO.updateVer - 1};
    meta.recycleState = RecycleState::NORMAL;
  } else if (writeIO.updateVer > 0) {
    if (writeIO.updateVer <= meta.commitVer) {
      auto msg = fmt::format("chunk {} {} committed update {} <= {}", chunkId, meta, writeIO.updateVer, meta.commitVer);
      XLOG(ERR, msg);
      return makeError(StorageCode::kChunkCommittedUpdate, std::move(msg));
    } else if (writeIO.updateVer <= meta.updateVer) {
      auto msg = fmt::format("chunk {} {} stale update {} <= {}", chunkId, meta, writeIO.updateVer, meta.updateVer);
      XLOG(ERR, msg);
      return makeError(StorageCode::kChunkStaleUpdate, std::move(msg));
    } else if (writeIO.updateVer > meta.updateVer + 1) {
      auto msg =
          fmt::format("chunk {} {} missing update {} > {} + 1", chunkId, meta, writeIO.updateVer, meta.updateVer);
      XLOG(ERR, msg);
      return makeError(StorageCode::kChunkMissingUpdate, std::move(msg));
    }
    meta.updateVer = writeIO.updateVer;
  } else {
    meta.updateVer += 1;
    if (meta.updateVer > meta.commitVer + 1) {
      auto msg = fmt::format("chunk {} {} advance update {}", chunkId, meta, writeIO);
      XLOG(ERR, msg);
      return makeError(StorageCode::kChunkAdvanceUpdate, std::move(msg));
    }
  }

  meta.chunkState = ChunkState::DIRTY;
  meta.chainVer = job.commitChainVer();
  meta.lastRequestId = job.requestCtx().tag.requestId;
  meta.lastClientUuid = job.requestCtx().tag.clientId.uuid;
  meta.timestamp = UtcClock::now();
  const bool isAppendWrite = writeIO.offset == meta.size;
  const bool skipPersist = (writeIO.isWrite() && isAppendWrite) || writeIO.isTruncate() || writeIO.isExtend();
  auto setResult = needCreateChunk ? store.createChunk(chunkId, chunkSize, chunkInfo, executor, job.allowToAllocate())
                                   : store.set(chunkId, chunkInfo, !skipPersist);
  if (UNLIKELY(!setResult)) {
    return makeError(std::move(setResult.error()));
  }
  result.commitChainVer = meta.chainVer;
  result.updateVer = meta.updateVer;

  uint32_t chunkSizeBeforeWrite = meta.size;

  // 3. do write operation.
  Result<uint32_t> writeResult = 0;
  if (writeIO.isTruncate() || writeIO.isExtend()) {
    if (writeIO.length <= meta.size) {
      if (writeIO.isTruncate()) {
        writeResult = (meta.size = writeIO.length);
      } else {
        writeResult = meta.size;
      }
    } else {
      // extend the chunk (fill zeros)
      writeResult = doRealWrite(chunkId, chunkInfo, kZeroBytes.data(), writeIO.length - meta.size, meta.size);
      if (writeResult) {
        writeResult = meta.size;  // set result to the actual chunk length if write succeeds
      }
    }
  } else if (writeIO.isRemove()) {
    // remove.
    if (!meta.readyToRemove()) {
      meta.recycleState = RecycleState::REMOVAL_IN_PROGRESS;
    }
    writeResult = 0;
  } else {
    // fill zeros before the write range if there is a gap
    if (meta.size < writeIO.offset) {
      RETURN_AND_LOG_ON_ERROR(chunkInfo.view.write(kZeroBytes.data(), writeIO.offset - meta.size, meta.size, meta));
    }

    // normal write.
    writeResult = doRealWrite(chunkId, chunkInfo, state.data, writeIO.length, writeIO.offset);
    if (writeResult) {
      if (options.isSyncing) meta.size = writeIO.length;
      storageUpdateSeqWrite.addSample(writeIO.offset == chunkSizeBeforeWrite);
    }
  }
  if (UNLIKELY(!writeResult)) {
    return writeResult;  // chunk becomes dirty.
  }

  // update chunk checksum
  auto checksumRes = updateChecksum(chunkInfo, writeIO, chunkSizeBeforeWrite, isAppendWrite);
  if (UNLIKELY(!checksumRes)) {
    return makeError(std::move(checksumRes.error()));
  }

  // 4. finish to write.
  meta.chunkState = ChunkState::CLEAN;

  XLOGF(DBG, "chunk {} {} write finish", chunkId, meta);
  setResult = store.set(chunkId, chunkInfo, !skipPersist);
  if (UNLIKELY(!setResult)) {
    return makeError(std::move(setResult.error()));
  }
  result.checksum = meta.checksum();
  result.commitVer = meta.commitVer;
  result.commitChainVer = meta.chainVer;

  recordGuard.succ();
  return writeResult;
}