in src/client/cli/admin/RemoveChunks.cc [27:198]
CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv,
const argparse::ArgumentParser &parser,
const Dispatcher::Args &args) {
auto &env = dynamic_cast<AdminEnv &>(ienv);
ENSURE_USAGE(args.empty());
ENSURE_USAGE(env.mgmtdClientGetter);
Dispatcher::OutputTable table;
// dump latest inodes
const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile");
const auto &inodeDir = parser.get<std::string>("inode-dir");
const auto &parquetFormat = parser.get<bool>("parquet-format");
if (boost::filesystem::exists(inodeDir)) {
XLOGF(CRITICAL, "Output directory for inodes already exists: {}", inodeDir);
co_return makeError(StatusCode::kInvalidArg);
}
auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile, numInodesPerFile, inodeDir, parquetFormat);
if (!dumpRes) co_return makeError(dumpRes.error());
// load the inode dump
std::time_t inodeDumpTime(std::time(nullptr));
auto uniqInodeIds = co_await loadInodeFromFiles(inodeDir,
{},
uint32_t{std::max(1U, std::thread::hardware_concurrency() / 2)},
parquetFormat,
&inodeDumpTime);
if (!uniqInodeIds) co_return makeError(uniqInodeIds.error());
const auto &doRemove = parser.get<bool>("do-remove");
const auto &orphanedPath = parser.get<std::string>("orphaned-path");
std::vector<Path> orphanedChunkPaths = listFilesFromPath(orphanedPath);
size_t totalOrphanedChunks = 0;
size_t totalRemainingOrphanedChunks = 0;
size_t totalRemovedOrphanedChunks = 0;
XLOGF(CRITICAL, "Removing orphaned chunks from {} files in path: {}", orphanedChunkPaths.size(), orphanedPath);
for (size_t orphanedChunkPathIndex = 0; orphanedChunkPathIndex < orphanedChunkPaths.size();
orphanedChunkPathIndex++) {
auto orphanedChunkPath = orphanedChunkPaths[orphanedChunkPathIndex];
ChunkMetaTable orphanedChunkmeta;
// load orphaned chunks
bool ok = parquetFormat ? orphanedChunkmeta.loadFromParquetFile(orphanedChunkPath)
: orphanedChunkmeta.loadFromFile(orphanedChunkPath, true /*jsonFormat*/);
if (!ok) {
XLOGF(FATAL, "Failed to load orphaned chunks in file: {}", orphanedChunkPath);
co_return makeError(StatusCode::kIOError);
}
if (orphanedChunkmeta.timestamp >= inodeDumpTime) {
XLOGF(CRITICAL,
"Orphaned chunk metadata dump time '{:%c}' >= inode snapshot time '{:%c}', skipping file: {}",
fmt::localtime(orphanedChunkmeta.timestamp),
fmt::localtime(inodeDumpTime),
orphanedChunkPath);
co_return makeError(StatusCode::kInvalidArg);
}
totalOrphanedChunks += orphanedChunkmeta.chunks.size();
std::vector<uint8_t> readBuffer(orphanedChunkmeta.chunks.size());
auto ioBuffer = env.storageClientGetter()->registerIOBuffer(&readBuffer[0], readBuffer.size());
if (!ioBuffer) co_return makeError(ioBuffer.error());
std::vector<storage::client::ReadIO> readIOs;
readIOs.reserve(orphanedChunkmeta.chunks.size());
for (const auto &orphanedChunkRow : orphanedChunkmeta.chunks) {
auto metaChunkId = meta::ChunkId::unpack(orphanedChunkRow.chunkmeta.chunkId.data());
auto metaInodeId = metaChunkId.inode();
// double check the inode of orphaned chunks
if (uniqInodeIds->count(metaInodeId)) {
XLOGF(CRITICAL,
"Stop removing since inode {} of chunk {} still exists, file: {}",
metaInodeId,
metaChunkId,
orphanedChunkPath);
co_return makeError(StatusCode::kInvalidArg);
}
auto readIO = env.storageClientGetter()->createReadIO(orphanedChunkRow.chainId,
orphanedChunkRow.chunkmeta.chunkId,
0 /* offset*/,
1 /* length*/,
&readBuffer[readIOs.size()],
&(*ioBuffer));
readIOs.push_back(std::move(readIO));
}
XLOGF_IF(FATAL,
readIOs.size() != orphanedChunkmeta.chunks.size(),
"Num of read IOs {} not equal to num of orphaned chunks {}, file: {}",
readIOs.size(),
orphanedChunkmeta.chunks.size(),
orphanedChunkPath);
// send batch read request to check if the orphaned chunks still there
co_await env.storageClientGetter()->batchRead(readIOs, flat::UserInfo{});
std::vector<storage::client::RemoveChunksOp> removeOps;
removeOps.reserve(orphanedChunkmeta.chunks.size());
for (const auto &readIO : readIOs) {
if (readIO.statusCode() == StorageClientCode::kChunkNotFound) {
XLOGF(DBG, "Orphaned chunk {} on {} does not exist, skipping", readIO.chunkId, readIO.routingTarget.chainId);
} else {
auto removeOp = env.storageClientGetter()->createRemoveOp(readIO.routingTarget.chainId,
storage::ChunkId(readIO.chunkId),
storage::ChunkId(readIO.chunkId, 1),
1 /*maxNumChunkIdsToProcess*/);
removeOps.push_back(std::move(removeOp));
XLOGF_IF(WARN,
readIO.statusCode() != StatusCode::kOK,
"Failed to read orphaned chunk {} on {} with error {}, removing it anyway",
readIO.chunkId,
readIO.routingTarget.chainId,
readIO.status());
}
}
totalRemainingOrphanedChunks += removeOps.size();
XLOGF_IF(WARN,
(!removeOps.empty() && removeOps.size() < readIOs.size()),
"Only {} out of {} orphaned chunks on {} still exist, file: {}",
removeOps.size(),
readIOs.size(),
orphanedChunkmeta.chainId,
orphanedChunkPath);
if (!doRemove) {
XLOGF(WARN, "Skip removing {} orphaned chunks in dry run mode, file: {}", removeOps.size(), orphanedChunkPath);
continue;
}
// send batch remove request
co_await env.storageClientGetter()->removeChunks(removeOps, flat::UserInfo{});
size_t removedChunks = std::accumulate(removeOps.begin(), removeOps.end(), size_t{0}, [](size_t s, const auto &op) {
return s + op.numProcessedChunks();
});
totalRemovedOrphanedChunks += removedChunks;
XLOGF(WARN,
"#{}/{} Removed {}/{} orphaned chunks on {}, file: {}",
orphanedChunkPathIndex,
orphanedChunkPaths.size(),
removedChunks,
removeOps.size(),
orphanedChunkmeta.chainId,
orphanedChunkPath);
}
XLOGF(CRITICAL,
"In total removed {} of {} remaining orphaned chunks among {} chunks found in directory: {}",
totalRemovedOrphanedChunks,
totalRemainingOrphanedChunks,
totalOrphanedChunks,
orphanedPath);
co_return table;
}