in src/client/cli/admin/FindOrphanedChunks.cc [31:175]
CoTryTask<Dispatcher::OutputTable> findOrphanedChunks(IEnv &ienv,
const argparse::ArgumentParser &parser,
const Dispatcher::Args &args) {
auto &env = dynamic_cast<AdminEnv &>(ienv);
ENSURE_USAGE(args.empty());
ENSURE_USAGE(env.mgmtdClientGetter);
Dispatcher::OutputTable table;
const auto &inodePath = parser.get<std::string>("inode-path");
const auto &chunkmetaPath = parser.get<std::string>("chunkmeta-path");
const auto &inodeInt64Ids = parser.get<std::vector<uint64_t>>("inode-ids");
const auto &orphanedDir = parser.get<std::string>("orphaned-dir");
const auto &ignoreChunkIdPrefix = parser.get<std::string>("ignore-chunkid-prefix");
const auto &onlyChunkIdPrefix = parser.get<std::string>("only-chunkid-prefix");
const auto &parquetFormat = parser.get<bool>("parquet-format");
const auto &skipSafetyCheck = parser.get<bool>("skip-safety-check");
const auto parallel = std::min(parser.get<uint32_t>("parallel"), std::thread::hardware_concurrency() / 2);
if (boost::filesystem::exists(orphanedDir)) {
XLOGF(CRITICAL, "Output directory for orphaned chunks already exists: {}", orphanedDir);
co_return makeError(StatusCode::kInvalidArg);
}
boost::system::error_code err{};
boost::filesystem::create_directories(orphanedDir, err);
if (UNLIKELY(err.failed())) {
XLOGF(CRITICAL, "Failed to create directory {}, error: {}", orphanedDir, err.message());
co_return makeError(StatusCode::kIOError);
}
std::time_t inodeDumpTime(std::time(nullptr));
robin_hood::unordered_set<meta::InodeId> inodeIdsToPeek;
for (const auto &n : inodeInt64Ids) inodeIdsToPeek.insert(meta::InodeId{n});
auto uniqInodeIds = co_await loadInodeFromFiles(inodePath, inodeIdsToPeek, parallel, parquetFormat, &inodeDumpTime);
if (!uniqInodeIds) {
XLOGF(FATAL, "Failed to load inodes from directory {}, error: {}", inodePath, uniqInodeIds.error());
co_return makeError(uniqInodeIds.error());
}
if (!inodeIdsToPeek.empty()) {
XLOGF(CRITICAL, "Completed to find {} inodes in path: {}", inodeIdsToPeek.size(), inodePath);
co_return table;
}
std::vector<Path> chunkmetaFilePaths = listFilesFromPath(chunkmetaPath);
XLOGF(CRITICAL, "Processing {} chunk metadata files in path: {}", chunkmetaFilePaths.size(), chunkmetaPath);
std::atomic_size_t numOrphanedChunks = 0;
std::atomic_size_t numIgnoredOrphanedChunks = 0;
std::atomic_size_t numProcessedChunks = 0;
std::atomic_size_t sizeOfOrphanedChunks = 0;
std::atomic_size_t sizeOfIgnoredOrphanedChunks = 0;
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(parallel);
CountDownLatch<folly::fibers::Baton> loadChunkmetaDone(chunkmetaFilePaths.size());
for (size_t chunkmetaFileIndex = 0; chunkmetaFileIndex < chunkmetaFilePaths.size(); chunkmetaFileIndex++) {
executor->add([&, chunkmetaFileIndex, chunkmetaFilePath = chunkmetaFilePaths[chunkmetaFileIndex]]() {
auto guard = folly::makeGuard([&loadChunkmetaDone]() { loadChunkmetaDone.countDown(); });
ChunkMetaTable metadata;
bool ok =
parquetFormat ? metadata.loadFromParquetFile(chunkmetaFilePath) : metadata.loadFromFile(chunkmetaFilePath);
if (!ok) {
XLOGF(FATAL, "Failed to load file: {}", chunkmetaFilePath);
return false;
}
if (!skipSafetyCheck && metadata.timestamp >= inodeDumpTime) {
XLOGF(FATAL,
"Chunk metadata dump time '{:%c}' >= inode snapshot time '{:%c}', skipping file: {}",
fmt::localtime(metadata.timestamp),
fmt::localtime(inodeDumpTime),
chunkmetaFilePath);
return false;
}
ChunkMetaTable orphanedMetadata{.chainId = metadata.chainId,
.targetId = metadata.targetId,
.timestamp = metadata.timestamp};
for (const auto &chunk : metadata.chunks) {
const auto &chunkmeta = chunk.chunkmeta;
if (chunkmeta.chunkId != storage::ChunkId{} && !uniqInodeIds->count(chunk.inodeId)) {
if ((!ignoreChunkIdPrefix.empty() && chunkmeta.chunkId.toString().starts_with(ignoreChunkIdPrefix)) ||
(!onlyChunkIdPrefix.empty() && !chunkmeta.chunkId.toString().starts_with(onlyChunkIdPrefix))) {
XLOGF(DBG, "Ignore an orphaned chunk on {}@{}: {}", chunkmeta, metadata.targetId, metadata.chainId);
numIgnoredOrphanedChunks++;
sizeOfIgnoredOrphanedChunks += chunkmeta.length;
} else {
XLOGF(DBG, "Found an orphaned chunk on {}@{}: {}", chunkmeta, metadata.targetId, metadata.chainId);
numOrphanedChunks++;
sizeOfOrphanedChunks += chunkmeta.length;
orphanedMetadata.chunks.push_back(chunk);
}
}
}
if (!orphanedMetadata.chunks.empty()) {
auto orphanedChunkFilePath = Path(orphanedDir) / chunkmetaFilePath.filename();
bool writeOk = parquetFormat
? orphanedMetadata.dumpToParquetFile(orphanedChunkFilePath.replace_extension(".parquet"))
: orphanedMetadata.dumpToFile(orphanedChunkFilePath, true /*jsonFormat*/);
if (writeOk) {
XLOGF(CRITICAL,
"{} orphaned chunks on {}@{} saved to file: {}",
orphanedMetadata.chunks.size(),
orphanedMetadata.targetId,
orphanedMetadata.chainId,
orphanedChunkFilePath);
}
}
XLOGF(WARN,
"#{}/{} Processed metadata of {} chunks in file: {}",
chunkmetaFileIndex + 1,
chunkmetaFilePaths.size(),
metadata.chunks.size(),
chunkmetaFilePath);
numProcessedChunks += metadata.chunks.size();
return true;
});
}
co_await loadChunkmetaDone.wait();
executor->join();
XLOGF(CRITICAL,
"In total {}/{} orphaned chunks ({:.3f} GB) ignored",
numIgnoredOrphanedChunks.load(),
numProcessedChunks.load(),
double(sizeOfIgnoredOrphanedChunks.load()) / 1_GB);
XLOGF(CRITICAL,
"In total {}/{} orphaned chunks ({:.3f} GB) saved to directory: {}",
numOrphanedChunks.load(),
numProcessedChunks.load(),
double(sizeOfOrphanedChunks.load()) / 1_GB,
orphanedDir);
co_return table;
}