in src/client/cli/admin/FileWrapper.cc [94:189]
CoTryTask<storage::ChecksumInfo> FileWrapper::readFile(AdminEnv &env,
std::ostream &out,
const std::vector<Block> &blocks,
bool checksum,
bool hex,
storage::client::TargetSelectionMode mode,
MD5_CTX *md5 /* = nullptr */,
bool fillZero /* = false */,
bool verbose /* = false */,
uint32_t targetIndex /* = 0 */) {
auto client = env.storageClientGetter();
auto buffer = co_await rdmabufPool->allocate();
if (UNLIKELY(!buffer)) {
XLOGF(ERR, "allocate buffer failed");
co_return makeError(RPCCode::kRDMANoBuf);
}
std::basic_string_view<uint8_t> data(buffer.ptr(), buffer.size());
auto readBuffer = storage::client::IOBuffer{buffer};
auto bufferOffset = 0;
std::vector<storage::client::ReadIO> batch;
storage::client::ReadOptions readOptions;
readOptions.set_enableChecksum(checksum);
readOptions.targetSelection().set_mode(mode);
readOptions.targetSelection().set_targetIndex(targetIndex);
storage::ChecksumInfo checksumInfo;
for (auto &block : blocks) {
auto readIO = client->createReadIO(block.chainId,
block.chunkId,
block.offset,
block.length,
(uint8_t *)&data[bufferOffset],
&readBuffer);
batch.push_back(std::move(readIO));
bufferOffset += block.length;
if (&block == &blocks.back() || bufferOffset + (&block + 1)->length > buffer.size()) {
auto result = co_await client->batchRead(batch, env.userInfo, readOptions);
CO_RETURN_AND_LOG_ON_ERROR(result);
for (auto &readIO : batch) {
auto succLength = 0ul;
if (readIO.result.lengthInfo) {
succLength = *readIO.result.lengthInfo;
} else if (readIO.result.lengthInfo.error().code() == StorageClientCode::kChunkNotFound && fillZero) {
} else {
CO_RETURN_AND_LOG_ON_ERROR(readIO.result.lengthInfo);
}
if (succLength != readIO.length) {
if (fillZero) {
if (verbose) {
std::cout << fmt::format("{} find hole, expected size {}, actual size {}\n",
readIO.chunkId,
readIO.length,
succLength);
}
auto needFill = readIO.length - succLength;
CO_RETURN_AND_LOG_ON_ERROR(readIO.result.checksum.combine(
storage::ChecksumInfo::create(storage::ChecksumType::CRC32C, zeros.data(), needFill),
needFill));
succLength = readIO.length;
} else {
co_return makeError(StatusCode::kInvalidFormat,
fmt::format("read is short, chain {} chunk {} offset {} expect {} read {}",
readIO.routingTarget.chainId,
readIO.chunkId,
readIO.offset,
readIO.length,
succLength));
}
}
CO_RETURN_AND_LOG_ON_ERROR(checksumInfo.combine(readIO.result.checksum, succLength));
}
// clean up.
if (hex) {
out << fmt::format("{:02X}", fmt::join(data.substr(0, bufferOffset), " "));
} else {
out.write(reinterpret_cast<const char *>(data.data()), bufferOffset);
}
if (md5) {
int ret = MD5_Update(md5, reinterpret_cast<const char *>(data.data()), bufferOffset);
if (UNLIKELY(ret != 1)) {
co_return makeError(StatusCode::kInvalidArg, fmt::format("md5 update error {}", ret));
}
}
if (!out) {
co_return makeError(StatusCode::kInvalidArg, "write to stream failed");
}
batch.clear();
bufferOffset = 0;
}
}
if (hex) {
out << std::endl;
}
co_return checksumInfo;
}