in src/meta/store/ops/Rename.cc [236:361]
CoTryTask<RenameRsp> run(IReadWriteTransaction &txn) override {
XLOGF(DBG, "RenameOp: {}", req_);
CHECK_REQUEST(req_);
auto [srcResult, dstResult] =
co_await folly::coro::collectAll(resolve(txn, req_.user).path(req_.src, AtFlags(AT_SYMLINK_NOFOLLOW)),
resolve(txn, req_.user).path(req_.dest, AtFlags(AT_SYMLINK_NOFOLLOW)));
CO_RETURN_ON_ERROR(srcResult);
CO_RETURN_ON_ERROR(dstResult);
// check dst, transaction may already executed.
if (dstResult->dirEntry.has_value() && dstResult->dirEntry->uuid != Uuid::zero() &&
dstResult->dirEntry->uuid == req_.uuid) {
// this may happens when FDB returns commit_unknown_result, or we failed to send response to client
XLOGF(CRITICAL, "Rename already finished, dst {}, req {}, uuid {}", *dstResult->dirEntry, req_, req_.uuid);
auto inode = co_await dstResult->dirEntry->snapshotLoadInode(txn);
CO_RETURN_ON_ERROR(inode);
co_return RenameRsp(std::move(*inode));
}
// src should exists
if (!srcResult->dirEntry.has_value()) {
co_return MAKE_ERROR_F(MetaCode::kNotFound, "rename src {} not found", req_.src);
}
// check src InodeId
if (req_.inodeId && srcResult->dirEntry->id != req_.inodeId) {
co_return MAKE_ERROR_F(MetaCode::kNotFound, "rename src {}, inodeId != {}", *srcResult->dirEntry, *req_.inodeId);
}
// if src and dst points to same dir entry, do nothing
if (dstResult->dirEntry.has_value() && dstResult->dirEntry->parent == srcResult->dirEntry->parent &&
dstResult->dirEntry->name == srcResult->dirEntry->name) {
auto inode = co_await dstResult->dirEntry->snapshotLoadInode(txn);
CO_RETURN_ON_ERROR(inode);
co_return RenameRsp(std::move(*inode));
}
// move to trash shouldn't replace file already exists
if (dstResult->dirEntry.has_value() && dstResult->dirEntry->isFile() && req_.moveToTrash) {
co_return MAKE_ERROR_F(MetaCode::kExists, "rename dest {} exist", req_.dest);
}
// dst shouldn't be a non-empty directory
if (dstResult->dirEntry.has_value() && dstResult->dirEntry->isDirectory()) {
auto checkResult = co_await DirEntryList::checkEmpty(txn, dstResult->dirEntry->id);
CO_RETURN_ON_ERROR(checkResult);
bool empty = checkResult.value();
if (!empty) {
co_return MAKE_ERROR_F(MetaCode::kNotEmpty, "rename dest {} not empty", req_.dest);
}
}
// now, dst can be safely replaced (not exist, empty directory, file, symlink).
std::optional<Path> origPath;
if (srcResult->dirEntry->isDirectory()) {
if (dstResult->dirEntry.has_value() && !dstResult->dirEntry->isDirectory()) {
// man 2 rename: oldpath can specify a directory. In this case, newpath must either not exist, or it must
// specify an empty directory.
co_return makeError(MetaCode::kNotDirectory);
}
CO_RETURN_ON_ERROR(co_await checkLoop(txn, *srcResult, *dstResult, origPath));
}
// permission check
std::optional<Inode> srcInode, dstInode;
CO_RETURN_ON_ERROR(co_await checkPermission(txn, *srcResult, srcInode, false));
CO_RETURN_ON_ERROR(co_await checkPermission(txn, *dstResult, dstInode, true));
// NOTE: add src/dst's parent inode and dirEntry into read conflict set.
CO_RETURN_ON_ERROR(co_await Inode(srcResult->getParentId()).addIntoReadConflict(txn));
CO_RETURN_ON_ERROR(co_await srcResult->dirEntry->addIntoReadConflict(txn));
CO_RETURN_ON_ERROR(co_await Inode(dstResult->getParentId()).addIntoReadConflict(txn));
CO_RETURN_ON_ERROR(
co_await DirEntry(dstResult->getParentId(), req_.dest.path->filename().native()).addIntoReadConflict(txn));
auto &srcEntry = srcResult->dirEntry.value();
auto inodeResult = co_await srcEntry.loadInode(txn);
CO_RETURN_ON_ERROR(inodeResult);
auto &inode = inodeResult.value();
if (srcEntry.isDirectory()) {
// NOTE: add src's inode into read conflict set.
// load inode and update it's parent, read modify write, should use load.
inode.asDirectory().parent = dstResult->getParentId();
inode.asDirectory().name = req_.dest.path->filename().native();
auto updateInodeResult = co_await inode.store(txn);
CO_RETURN_ON_ERROR(updateInodeResult);
}
// remove src entry and dst entry
CO_RETURN_ON_ERROR(co_await srcEntry.remove(txn));
auto removeDstResult = co_await removeDst(txn, *dstResult, dstInode);
CO_RETURN_ON_ERROR(removeDstResult);
auto &oldDst = *removeDstResult;
// create dst entry
DirEntry newDstEntry(dstResult->getParentId(), req_.dest.path->filename().native());
newDstEntry.data() = srcEntry.data();
newDstEntry.uuid = req_.uuid;
CO_RETURN_ON_ERROR(co_await newDstEntry.store(txn));
auto &event = addEvent(Event::Type::Rename)
.addField("srcParent", srcEntry.parent)
.addField("srcName", srcEntry.name)
.addField("dstParent", newDstEntry.parent)
.addField("dstName", newDstEntry.name)
.addField("inode", newDstEntry.id)
.addField("user", req_.user.uid)
.addField("host", req_.client.hostname);
addTrace(MetaEventTrace{.eventType = Event::Type::Rename,
.inodeId = newDstEntry.id,
.parentId = srcEntry.parent,
.entryName = srcEntry.name,
.dstParentId = newDstEntry.parent,
.dstEntryName = newDstEntry.name,
.userId = req_.user.uid,
.client = req_.client,
.origPath = origPath.value_or(Path())});
if (oldDst.has_value()) {
auto [oldDstInode, oldDstNlink] = *oldDst;
event.addField("oldDstInode", oldDstInode).addField("oldDstNlink", oldDstNlink);
}
if (origPath.has_value()) {
event.addField("origPath", origPath->string());
}
co_return RenameRsp(std::move(inode));
}