CoTryTask FileWrapper::readFile()

in src/client/cli/admin/FileWrapper.cc [94:189]


CoTryTask<storage::ChecksumInfo> FileWrapper::readFile(AdminEnv &env,
                                                       std::ostream &out,
                                                       const std::vector<Block> &blocks,
                                                       bool checksum,
                                                       bool hex,
                                                       storage::client::TargetSelectionMode mode,
                                                       MD5_CTX *md5 /* = nullptr */,
                                                       bool fillZero /* = false */,
                                                       bool verbose /* = false */,
                                                       uint32_t targetIndex /* = 0 */) {
  auto client = env.storageClientGetter();
  auto buffer = co_await rdmabufPool->allocate();
  if (UNLIKELY(!buffer)) {
    XLOGF(ERR, "allocate buffer failed");
    co_return makeError(RPCCode::kRDMANoBuf);
  }
  std::basic_string_view<uint8_t> data(buffer.ptr(), buffer.size());
  auto readBuffer = storage::client::IOBuffer{buffer};
  auto bufferOffset = 0;
  std::vector<storage::client::ReadIO> batch;
  storage::client::ReadOptions readOptions;
  readOptions.set_enableChecksum(checksum);
  readOptions.targetSelection().set_mode(mode);
  readOptions.targetSelection().set_targetIndex(targetIndex);

  storage::ChecksumInfo checksumInfo;
  for (auto &block : blocks) {
    auto readIO = client->createReadIO(block.chainId,
                                       block.chunkId,
                                       block.offset,
                                       block.length,
                                       (uint8_t *)&data[bufferOffset],
                                       &readBuffer);
    batch.push_back(std::move(readIO));
    bufferOffset += block.length;
    if (&block == &blocks.back() || bufferOffset + (&block + 1)->length > buffer.size()) {
      auto result = co_await client->batchRead(batch, env.userInfo, readOptions);
      CO_RETURN_AND_LOG_ON_ERROR(result);
      for (auto &readIO : batch) {
        auto succLength = 0ul;
        if (readIO.result.lengthInfo) {
          succLength = *readIO.result.lengthInfo;
        } else if (readIO.result.lengthInfo.error().code() == StorageClientCode::kChunkNotFound && fillZero) {
        } else {
          CO_RETURN_AND_LOG_ON_ERROR(readIO.result.lengthInfo);
        }
        if (succLength != readIO.length) {
          if (fillZero) {
            if (verbose) {
              std::cout << fmt::format("{} find hole, expected size {}, actual size {}\n",
                                       readIO.chunkId,
                                       readIO.length,
                                       succLength);
            }
            auto needFill = readIO.length - succLength;
            CO_RETURN_AND_LOG_ON_ERROR(readIO.result.checksum.combine(
                storage::ChecksumInfo::create(storage::ChecksumType::CRC32C, zeros.data(), needFill),
                needFill));
            succLength = readIO.length;
          } else {
            co_return makeError(StatusCode::kInvalidFormat,
                                fmt::format("read is short, chain {} chunk {} offset {} expect {} read {}",
                                            readIO.routingTarget.chainId,
                                            readIO.chunkId,
                                            readIO.offset,
                                            readIO.length,
                                            succLength));
          }
        }
        CO_RETURN_AND_LOG_ON_ERROR(checksumInfo.combine(readIO.result.checksum, succLength));
      }

      // clean up.
      if (hex) {
        out << fmt::format("{:02X}", fmt::join(data.substr(0, bufferOffset), " "));
      } else {
        out.write(reinterpret_cast<const char *>(data.data()), bufferOffset);
      }
      if (md5) {
        int ret = MD5_Update(md5, reinterpret_cast<const char *>(data.data()), bufferOffset);
        if (UNLIKELY(ret != 1)) {
          co_return makeError(StatusCode::kInvalidArg, fmt::format("md5 update error {}", ret));
        }
      }
      if (!out) {
        co_return makeError(StatusCode::kInvalidArg, "write to stream failed");
      }
      batch.clear();
      bufferOffset = 0;
    }
  }
  if (hex) {
    out << std::endl;
  }
  co_return checksumInfo;
}