CoTryTask run()

in src/meta/store/ops/Remove.cc [64:192]


  CoTryTask<RemoveRsp> run(IReadWriteTransaction &txn) override {
    XLOGF(DBG, "RemoveOp: {}", req_);

    CHECK_REQUEST(req_);

    Result<PathResolveOp::ResolveResult> resolveResult = makeError(MetaCode::kFoundBug);
    if (req_.path.path.has_value()) {
      resolveResult = co_await resolve(txn, req_.user)
                          .path(req_.path, AtFlags(AT_SYMLINK_NOFOLLOW) /* remove shouldn't follow symlink */);
    } else {
      resolveResult = co_await resolve(txn, req_.user).byDirectoryInodeId(req_.path.parent);
    }
    CO_RETURN_ON_ERROR(resolveResult);

    if (!resolveResult->dirEntry.has_value()) {
      co_return makeError(MetaCode::kNotFound);
    } else if (resolveResult->dirEntry->id.isTreeRoot()) {
      // don't permit remove root
      co_return MAKE_ERROR_F(MetaCode::kNoPermission, "Can't remove tree root {}", *resolveResult->dirEntry);
    }
    // check src InodeId
    if (req_.inodeId && resolveResult->dirEntry->id != req_.inodeId) {
      co_return MAKE_ERROR_F(MetaCode::kNotFound, "remove {}, inodeId != {}", *resolveResult->dirEntry, *req_.inodeId);
    }

    // check permission, must have write permission to parent directory, and not locked
    auto parent = co_await resolveResult->getParentInode(txn);
    CO_RETURN_ON_ERROR(parent);
    CO_RETURN_ON_ERROR(parent->acl.checkPermission(req_.user, AccessType::WRITE));
    CO_RETURN_ON_ERROR(parent->asDirectory().checkLock(req_.client));
    auto &entry = resolveResult->dirEntry.value();
    if (req_.checkType) {
      if (req_.atFlags.contains(AT_REMOVEDIR) && entry.isFile()) {
        co_return makeError(MetaCode::kNotDirectory);
      }
      if (!req_.atFlags.contains(AT_REMOVEDIR) && entry.isDirectory()) {
        co_return makeError(MetaCode::kIsDirectory);
      }
    }

    // The sticky bit (S_ISVTX) on a directory means that a file in that directory can be renamed or deleted
    // only by the owner of the file, by the owner of the directory, and by a privileged process.
    auto loadInodeResult = co_await entry.snapshotLoadInode(txn);
    CO_RETURN_ON_ERROR(loadInodeResult);
    auto &inode = *loadInodeResult;
    if ((parent->acl.perm & S_ISVTX) && req_.user.uid != parent->acl.uid && !req_.user.isRoot() &&
        req_.user.uid != inode.acl.uid) {
      auto msg = fmt::format("can't remove {}, S_ISVTX set on parent {} {}", entry, parent->id, parent->acl);
      XLOG(DBG, msg);
      co_return makeError(MetaCode::kNoPermission, msg);
    }
    if (inode.acl.iflags & FS_IMMUTABLE_FL) {
      auto msg = fmt::format("can't remove {}, FS_IMMUTABLE_FL set on inode", entry);
      XLOG(DBG, msg);
      co_return makeError(MetaCode::kNoPermission, msg);
    }

    auto type = std::string(magic_enum::enum_name(inode.getType()));
    folly::toLowerAscii(type);

    auto event = Event(Event::Type::Remove);
    event.addField("parent", entry.parent)
        .addField("name", entry.name)
        .addField("inode", entry.id)
        .addField("type", type)
        .addField("owner", entry.dirAcl->uid)
        .addField("nlink", inode.nlink - 1)
        .addField("user", req_.user.uid)
        .addField("host", req_.client.hostname);
    auto trace = MetaEventTrace{.eventType = Event::Type::Remove,
                                .inodeId = entry.id,
                                .parentId = entry.parent,
                                .entryName = entry.name,
                                .ownerId = inode.acl.uid,
                                .userId = req_.user.uid,
                                .client = req_.client,
                                .inodeType = inode.getType(),
                                .nlink = inode.nlink,
                                .recursiveRemove = req_.recursive};

    auto gcInfo = GcInfo{req_.user.uid, entry.name};
    if (entry.isDirectory()) {
      event.addField("recursive", req_.recursive);

      auto result = co_await DirEntryList::checkEmpty(txn, entry.id);
      CO_RETURN_ON_ERROR(result);
      if (auto empty = *result; empty) {
        XLOGF_IF(DFATAL, inode.nlink != 1, "Directory {} nlink != 1", inode);
        // remove directory directly
        CO_RETURN_ON_ERROR(co_await entry.addIntoReadConflict(txn));
        CO_RETURN_ON_ERROR(co_await inode.addIntoReadConflict(txn));
        CO_RETURN_ON_ERROR(co_await entry.remove(txn));
        CO_RETURN_ON_ERROR(co_await inode.remove(txn));

        addEvent(event);
        addTrace(std::move(trace));

        co_return RemoveRsp{};
      }

      if (!req_.recursive) {
        co_return makeError(MetaCode::kNotEmpty);
      }

      CO_RETURN_ON_ERROR(inode.acl.checkRecursiveRmPerm(req_.user, config().recursive_remove_check_owner()));
      auto recursiveCheck = config().recursive_remove_perm_check();
      if (recursiveCheck) {
        auto res = co_await DirEntryList::recursiveCheckRmPerm(txn, inode.id, req_.user, recursiveCheck, 128);
        CO_RETURN_ON_ERROR(res);
      }

      // recursive remove, save original path
      auto ancestors = std::vector<Inode>();
      CO_RETURN_ON_ERROR(co_await Inode::loadAncestors(txn, ancestors, entry.parent));
      for (auto &ancestor : ancestors) {
        gcInfo.origPath = ancestor.asDirectory().name / gcInfo.origPath;
      }
      event.addField("origPath", gcInfo.origPath.string());
      trace.origPath = gcInfo.origPath;
    }

    auto result = co_await gcManager().removeEntry(txn, entry, inode, gcInfo);
    CO_RETURN_ON_ERROR(result);

    addEvent(event);
    addTrace(std::move(trace));

    co_return RemoveRsp{};
  }