src/meta/service/MetaOperator.cc (384 lines of code) (raw):
#include "meta/service/MetaOperator.h"
#include <algorithm>
#include <cassert>
#include <cmath>
#include <cstdint>
#include <fcntl.h>
#include <fmt/core.h>
#include <folly/Conv.h>
#include <folly/Expected.h>
#include <folly/Overload.h>
#include <folly/Random.h>
#include <folly/ScopeGuard.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Invoke.h>
#include <folly/experimental/coro/Sleep.h>
#include <folly/functional/Invoke.h>
#include <folly/functional/Partial.h>
#include <folly/logging/xlog.h>
#include <functional>
#include <memory>
#include <optional>
#include <type_traits>
#include <unistd.h>
#include <utility>
#include <vector>
#include "common/app/NodeId.h"
#include "common/kv/ITransaction.h"
#include "common/kv/WithTransaction.h"
#include "common/monitor/Recorder.h"
#include "common/serde/ClientContext.h"
#include "common/utils/BackgroundRunner.h"
#include "common/utils/CPUExecutorGroup.h"
#include "common/utils/Coroutine.h"
#include "common/utils/CoroutinesPool.h"
#include "common/utils/Duration.h"
#include "common/utils/FaultInjection.h"
#include "common/utils/Result.h"
#include "common/utils/RobinHood.h"
#include "common/utils/UtcTime.h"
#include "core/user/UserToken.h"
#include "fbs/meta/Service.h"
#include "fbs/meta/Utils.h"
#include "fdb/FDBRetryStrategy.h"
#include "meta/components/ChainAllocator.h"
#include "meta/components/Distributor.h"
#include "meta/components/FileHelper.h"
#include "meta/components/Forward.h"
#include "meta/components/InodeIdAllocator.h"
#include "meta/components/SessionManager.h"
#include "meta/store/Idempotent.h"
#include "meta/store/Inode.h"
#include "meta/store/MetaStore.h"
#include "meta/store/Operation.h"
#include "meta/store/PathResolve.h"
#include "meta/store/Utils.h"
#include "meta/store/ops/BatchOperation.h"
#define AUTHENTICATE(user) \
do { \
if (config_.authenticate()) { \
CO_RETURN_ON_ERROR(co_await authenticate(user)); \
} \
} while (0)
namespace hf3fs::meta::server {
using namespace std::chrono_literals;
template <typename Func, typename Arg>
auto MetaOperator::runOp(Func &&func, Arg &&arg)
-> CoTryTask<typename std::invoke_result_t<Func, MetaStore, Arg &&>::element_type::RspT> {
#ifndef NDEBUG
auto fi = FaultInjection::clone();
#endif
auto deadline = std::optional<SteadyTime>();
if constexpr (std::is_base_of_v<ReqBase, std::remove_reference_t<Arg>>) {
CO_RETURN_ON_ERROR(arg.valid());
if (config_.operation_timeout() != 0_s) {
deadline = SteadyClock::now() + config_.operation_timeout();
}
}
auto txn = kvEngine_->createReadWriteTransaction();
auto op = ((*metaStore_).*func)(std::forward<Arg>(arg));
auto driver = OperationDriver(*op, arg, deadline);
co_return co_await driver.run(std::move(txn), createRetryConfig(), config_.readonly(), config_.grv_cache());
}
CoTryTask<Inode> MetaOperator::runBatch(InodeId inodeId,
std::unique_ptr<BatchedOp> op,
std::optional<SteadyTime> deadline) {
#ifndef NDEBUG
auto fi = FaultInjection::clone();
#endif
assert(op);
auto txn = kvEngine_->createReadWriteTransaction();
auto driver = OperationDriver(*op, Void{}, deadline);
auto result = co_await driver.run(std::move(txn), createRetryConfig(), config_.readonly(), config_.grv_cache());
if (!result.hasError()) {
XLOGF_IF(FATAL, inodeId != result->id, "expected {}, get {}", inodeId, result->id);
}
batches_.withLock(
[&](auto &map) {
auto iter = map.find(op->inodeId_);
XLOGF_IF(FATAL, iter == map.end(), "shouldn't happen");
if (!iter->second.wakeupNext()) {
map.erase(iter);
}
},
op->inodeId_);
co_return result;
}
template <typename Req, typename Rsp>
CoTryTask<Rsp> MetaOperator::runInBatch(InodeId inodeId, Req req) {
CO_RETURN_ON_ERROR(req.valid());
auto deadline = std::optional<SteadyTime>();
if (config_.operation_timeout() != 0_s) {
deadline = SteadyClock::now() + config_.operation_timeout();
}
OperationRecorder::Guard guard(OperationRecorder::server(), MetaSerde<>::getRpcName(req), req.user.uid);
BatchedOp::Waiter<Req, Rsp> waiter(std::move(req));
auto op = addBatchReq(inodeId, waiter);
co_await waiter.baton;
if (op) {
co_await runBatch(inodeId, std::move(op), deadline);
}
auto result = waiter.getResult();
guard.finish(result);
co_return result;
}
MetaOperator::MetaOperator(const Config &cfg,
flat::NodeId nodeId,
std::shared_ptr<kv::IKVEngine> kvEngine,
std::shared_ptr<client::ICommonMgmtdClient> mgmtdClient,
std::shared_ptr<storage::client::StorageClient> storageClient,
std::unique_ptr<Forward> forward)
: config_(cfg),
nodeId_(nodeId),
metaEventTraceLog_(config_.event_trace_log()),
kvEngine_(kvEngine),
mgmtd_(mgmtdClient),
distributor_(std::make_shared<Distributor>(cfg.distributor(), nodeId, kvEngine)),
userStore_(std::make_shared<core::UserStoreEx>(*kvEngine_, config_.retry_transaction(), config_.user_cache())),
inodeIdAlloc_(InodeIdAllocator::create(kvEngine)),
chainAlloc_(std::make_shared<ChainAllocator>(mgmtdClient)),
fileHelper_(std::make_shared<FileHelper>(cfg, mgmtdClient, storageClient)),
sessionManager_(
std::make_shared<SessionManager>(cfg.session_manager(), nodeId, kvEngine_, mgmtdClient, fileHelper_)),
gcManager_(std::make_shared<GcManager>(cfg,
nodeId,
metaEventTraceLog_,
kvEngine_,
mgmtdClient,
inodeIdAlloc_,
fileHelper_,
sessionManager_,
userStore_)),
forward_(std::move(forward)),
metaStore_(std::make_unique<MetaStore>(cfg,
metaEventTraceLog_,
distributor_,
inodeIdAlloc_,
chainAlloc_,
fileHelper_,
sessionManager_,
gcManager_)) {
sessionManager_->setCloseFunc(
[&](const auto &req) -> CoTryTask<void> { co_return (co_await close(req)).then([](auto &) { return Void{}; }); });
}
CoTryTask<void> MetaOperator::init(std::optional<Layout> layout) {
XLOGF(INFO, "MetaOperator::init");
if (layout.has_value()) {
CO_RETURN_ON_ERROR(co_await runOp(&MetaStore::initFs, *layout));
}
if (!metaEventTraceLog_.open()) {
XLOGF(CRITICAL, "Failed to open trace log in directory: {}", config_.event_trace_log().trace_file_dir());
co_return makeError(StatusCode::kIOError);
}
CO_RETURN_ON_ERROR(co_await gcManager_->init());
XLOGF(INFO, "MetaOperator::init success.");
co_return Void{};
}
void MetaOperator::start(CPUExecutorGroup &exec) {
XLOGF(INFO, "MetaOperator::start");
distributor_->start(exec);
fileHelper_->start(exec);
gcManager_->start(exec);
sessionManager_->start(exec);
bgRunner_ = std::make_unique<BackgroundRunner>(&exec.randomPick());
bgRunner_->start(
"idempotent_clean",
[&]() -> CoTask<void> {
if (!isFirstMeta(*mgmtd_, nodeId_)) co_return;
auto prev = std::optional<std::string>();
size_t total = 0, cleaned = 0;
auto more = true;
while (more && !stop_) {
size_t t = 0, c = 0;
auto strategy = kv::FDBRetryStrategy(createRetryConfig());
auto txn = kvEngine_->createReadWriteTransaction();
auto result = co_await kv::WithTransaction(strategy).run(
std::move(txn),
[&](kv::IReadWriteTransaction &txn) -> CoTryTask<std::pair<std::string, bool>> {
co_return co_await Idempotent::clean(txn, prev, config_.idempotent_record_expire(), 2048, t, c);
});
if (!result) {
XLOGF(ERR, "Clean idempotent record failed, {}", result.error());
break;
}
total += t;
cleaned += c;
prev = result->first;
more = result->second;
}
XLOGF(INFO, "Clean idempotent record, total {}, cleaned {}", total, cleaned);
co_return;
},
config_.idempotent_record_clean_getter());
}
void MetaOperator::beforeStop() {
XLOGF(INFO, "MetaOperator::beforeStop");
stop_ = true;
if (distributor_) {
distributor_->stopAndJoin(true);
}
XLOGF(INFO, "MetaOperator::beforeStop finished");
}
void MetaOperator::afterStop() {
XLOGF(INFO, "MetaOperator::afterStop");
if (bgRunner_) {
folly::coro::blockingWait(bgRunner_->stopAll());
bgRunner_.reset();
}
if (gcManager_) {
gcManager_->stopAndJoin();
}
if (sessionManager_) {
sessionManager_->stopAndJoin();
}
if (fileHelper_) {
fileHelper_->stopAndJoin();
}
metaEventTraceLog_.close();
XLOGF(INFO, "MetaOperator::afterStop finished");
}
kv::FDBRetryStrategy::Config MetaOperator::createRetryConfig() const {
return kv::FDBRetryStrategy::Config{config_.retry_transaction().max_backoff(),
config_.retry_transaction().max_retry_count(),
true};
}
CoTryTask<void> MetaOperator::authenticate(UserInfo &userInfo) {
static monitor::CountRecorder failed("meta_server.auth_failed");
auto guard = folly::makeGuard([&]() {
failed.addSample(1, {{"uid", folly::to<std::string>(userInfo.uid.toUnderType())}});
});
auto ret = co_await userStore_->authenticate(userInfo);
CO_RETURN_ON_ERROR(ret);
guard.dismiss();
co_return Void{};
}
CoTryTask<AuthRsp> MetaOperator::authenticate(AuthReq req) {
AUTHENTICATE(req.user);
co_return AuthRsp(std::move(req.user));
}
CoTryTask<StatFsRsp> MetaOperator::statFs(StatFsReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::statFs, req);
}
CoTryTask<StatRsp> MetaOperator::stat(StatReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::stat, req);
}
CoTryTask<BatchStatRsp> MetaOperator::batchStat(BatchStatReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::batchStat, req);
}
CoTryTask<BatchStatByPathRsp> MetaOperator::batchStatByPath(BatchStatByPathReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::batchStatByPath, req);
}
CoTryTask<GetRealPathRsp> MetaOperator::getRealPath(GetRealPathReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::getRealPath, req);
}
CoTryTask<OpenRsp> MetaOperator::open(OpenReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::open, req);
}
CoTryTask<TruncateRsp> MetaOperator::truncate(TruncateReq req) {
XLOGF(CRITICAL, "truncate is deperated, update client {}", req.client.hostname);
co_return makeError(StatusCode::kNotImplemented, "truncate is deperated, update client");
}
CoTryTask<SyncRsp> MetaOperator::sync(SyncReq req) {
// NOTE: don't auth user for sync
auto node = distributor_->getServer(req.inode);
if (node == distributor_->nodeId()) {
auto inodeId = req.inode;
co_return co_await runInBatch<SyncReq, SyncRsp>(inodeId, std::move(req));
} else {
co_return co_await forward_->forward<SyncReq, SyncRsp>(node, std::move(req));
}
}
CoTryTask<CloseRsp> MetaOperator::close(CloseReq req) {
// Note: don't auth user here
auto node = distributor_->getServer(req.inode);
if (node == distributor_->nodeId()) {
auto inodeId = req.inode;
co_return co_await runInBatch<CloseReq, CloseRsp>(inodeId, std::move(req));
} else {
co_return co_await forward_->forward<CloseReq, CloseRsp>(node, std::move(req));
}
}
CoTryTask<CreateRsp> MetaOperator::create(CreateReq req) {
AUTHENTICATE(req.user);
CO_RETURN_ON_ERROR(req.valid());
XLOGF(DBG, "create {}", req);
if (req.path.path->has_parent_path()) {
// try open first.
auto result = co_await runOp(&MetaStore::tryOpen, req);
if (result.hasValue() || req.path.path->has_parent_path()) {
co_return result;
}
if (!req.valid()) {
auto msg = fmt::format("req {} not valid after try open", req);
XLOG(DFATAL, msg);
co_return makeError(MetaCode::kFoundBug, std::move(msg));
}
XLOGF(DBG, "create {}", req);
}
auto node = distributor_->getServer(req.path.parent);
if (node == distributor_->nodeId()) {
auto parentId = req.path.parent;
co_return co_await runInBatch<CreateReq, CreateRsp>(parentId, std::move(req));
} else {
co_return co_await forward_->forward<CreateReq, CreateRsp>(node, std::move(req));
}
}
CoTryTask<MkdirsRsp> MetaOperator::mkdirs(MkdirsReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::mkdirs, req);
}
CoTryTask<SymlinkRsp> MetaOperator::symlink(SymlinkReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::symlink, req);
}
CoTryTask<RemoveRsp> MetaOperator::remove(RemoveReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::remove, req);
}
CoTryTask<RenameRsp> MetaOperator::rename(RenameReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::rename, req);
}
CoTryTask<ListRsp> MetaOperator::list(ListReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::list, req);
}
CoTryTask<HardLinkRsp> MetaOperator::hardLink(HardLinkReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::hardLink, req);
}
CoTryTask<SetAttrRsp> MetaOperator::setAttr(SetAttrReq req) {
AUTHENTICATE(req.user);
if (req.path.path) {
co_return co_await runOp(&MetaStore::setAttr, req);
}
auto node = distributor_->getServer(req.path.parent);
if (node == distributor_->nodeId()) {
auto parentId = req.path.parent;
co_return co_await runInBatch<SetAttrReq, SetAttrRsp>(parentId, std::move(req));
} else {
co_return co_await forward_->forward<SetAttrReq, SetAttrRsp>(node, std::move(req));
}
}
CoTryTask<LockDirectoryRsp> MetaOperator::lockDirectory(LockDirectoryReq req) {
AUTHENTICATE(req.user);
co_return co_await runOp(&MetaStore::lockDirectory, req);
}
CoTryTask<PruneSessionRsp> MetaOperator::pruneSession(PruneSessionReq req) {
co_return co_await runOp(&MetaStore::pruneSession, req);
}
CoTryTask<DropUserCacheRsp> MetaOperator::dropUserCache(DropUserCacheReq req) {
if (req.dropAll) {
userStore_->cache().clear();
} else if (req.uid) {
userStore_->cache().clear(*req.uid);
}
co_return DropUserCacheRsp{};
}
CoTryTask<TestRpcRsp> MetaOperator::testRpc(TestRpcReq req) {
// don't need auth user
co_return co_await runOp(&MetaStore::testRpc, req);
}
} // namespace hf3fs::meta::server