in src/client/cli/admin/DumpChunkMeta.cc [28:166]
CoTryTask<Dispatcher::OutputTable> dumpChunkMeta(IEnv &ienv,
const argparse::ArgumentParser &parser,
const Dispatcher::Args &args) {
auto &env = dynamic_cast<AdminEnv &>(ienv);
ENSURE_USAGE(args.empty());
Dispatcher::OutputTable table;
const auto &chainIds = parser.get<std::vector<uint32_t>>("chain-ids");
const auto &chunkmetaDir = parser.get<std::string>("chunkmeta-dir");
const auto parallel = std::min(parser.get<uint32_t>("parallel"), std::thread::hardware_concurrency() / 2);
const auto &parquetFormat = parser.get<bool>("parquet-format");
const auto &onlyHead = parser.get<bool>("only-head");
if (boost::filesystem::exists(chunkmetaDir)) {
XLOGF(CRITICAL, "Output directory for chunk metadata already exists: {}", chunkmetaDir);
co_return makeError(StatusCode::kInvalidArg);
}
boost::system::error_code err{};
boost::filesystem::create_directories(chunkmetaDir, err);
if (UNLIKELY(err.failed())) {
XLOGF(CRITICAL, "Failed to create directory {}, error: {}", chunkmetaDir, err.message());
co_return makeError(StatusCode::kIOError);
}
XLOGF(CRITICAL, "Saving chunk metadata to directory: {}", chunkmetaDir);
CO_RETURN_AND_LOG_ON_ERROR(co_await env.mgmtdClientGetter()->refreshRoutingInfo(/*force=*/true));
auto routingInfo = env.mgmtdClientGetter()->getRoutingInfo();
auto chainInfos = routingInfo->raw()->chains;
XLOGF(CRITICAL, "Found {} replication chains in {}", chainInfos.size(), routingInfo->raw()->routingInfoVersion);
std::map<flat::ChainId, flat::ChainInfo> sortedChainInfos;
for (const auto &[chainId, chainInfo] : chainInfos) sortedChainInfos.emplace(chainId, chainInfo);
std::set<uint32_t> selectedChainIds(chainIds.begin(), chainIds.end());
size_t numChainsToSave = selectedChainIds.empty() ? sortedChainInfos.size() : selectedChainIds.size();
std::atomic_size_t numTargetsSaved = 0;
auto dumpChunkMeta = [env, chunkmetaDir, parquetFormat, numChainsToSave, &numTargetsSaved](
const size_t chainIndex,
const flat::ChainId chainId,
const flat::TargetId targetId) -> CoTask<bool> {
XLOGF(INFO, "#{}/{} Getting chunk metadata from target: {}@{}", chainIndex, numChainsToSave, targetId, chainId);
auto chunkMetadata = co_await env.storageClientGetter()->getAllChunkMetadata(chainId, targetId);
if (!chunkMetadata) {
XLOGF(ERR,
"Failed to get chunk metadata from target: {}@{}, error: {}",
targetId,
chainId,
chunkMetadata.error());
co_return false;
}
time_t timestamp = UtcClock::secondsSinceEpoch();
ChunkMetaTable metadata{chainId, targetId, timestamp};
for (const auto &chunkmeta : *chunkMetadata) {
auto metaChunkId = meta::ChunkId::unpack(chunkmeta.chunkId.data());
metadata.chunks.push_back({timestamp, chainId, targetId, metaChunkId.inode(), chunkmeta});
}
XLOGF(INFO,
"#{}/{} Found {} chunks on target: {}@{}",
chainIndex,
numChainsToSave,
metadata.chunks.size(),
targetId,
chainId);
auto filePath =
Path(chunkmetaDir) / fmt::format("{}-{}.meta", uint32_t(metadata.chainId), uint64_t(metadata.targetId));
bool writeOk = parquetFormat ? metadata.dumpToParquetFile(filePath.replace_extension(".parquet"))
: metadata.dumpToFile(filePath);
numTargetsSaved += writeOk;
XLOGF_IF(WARN,
writeOk,
"#{}/{} Metadata of {} chunks on {}@{} saved to file: {}",
chainIndex,
numChainsToSave,
metadata.chunks.size(),
targetId,
chainId,
filePath);
co_return writeOk;
};
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(parallel);
std::vector<folly::coro::TaskWithExecutor<bool>> tasks;
size_t chainIndex = 0;
for (const auto &[chainId, chainInfo] : sortedChainInfos) {
if (!selectedChainIds.empty() && !selectedChainIds.count(uint32_t(chainId))) continue;
chainIndex++;
if (chainInfo.targets.empty()) {
XLOGF(CRITICAL, "Empty list of targets on {}: {}", chainId, chainInfo);
co_return makeError(StorageClientCode::kRoutingError);
}
for (const auto &target : chainInfo.targets) {
if (target.publicState == flat::PublicTargetState::SERVING) {
if (parallel > 0) {
tasks.push_back(dumpChunkMeta(chainIndex, chainId, target.targetId)
.scheduleOn(folly::Executor::getKeepAliveToken(*executor)));
} else {
bool ok = co_await dumpChunkMeta(chainIndex, chainId, target.targetId);
if (!ok) co_return makeError(StatusCode::kIOError);
}
} else {
XLOGF(WARN, "Skip target {}@{} not in serving state: {}", target.targetId, chainId, target);
}
if (onlyHead) break;
}
}
if (parallel > 0 && !tasks.empty()) {
auto results = co_await folly::coro::collectAllWindowed(std::move(tasks), parallel);
if (!std::all_of(results.begin(), results.end(), [](bool ok) { return ok; })) {
XLOGF(CRITICAL, "Some of the chunkmeta dump tasks failed");
co_return makeError(StatusCode::kIOError);
}
}
XLOGF(CRITICAL,
"Chunk metadata on {} targets from {} chains saved to directory: {}",
numTargetsSaved.load(),
numChainsToSave,
chunkmetaDir);
co_return table;
}