aios/storage/indexlib/file_system/LogicalFileSystem.cpp (1,469 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "indexlib/file_system/LogicalFileSystem.h"
#include <algorithm>
#include <assert.h>
#include <map>
#include <mutex>
#include <ostream>
#include <unordered_set>
#include <utility>
#include "autil/CommonMacros.h"
#include "autil/StringUtil.h"
#include "autil/TimeUtility.h"
#include "fslib/common/common_type.h"
#include "indexlib/file_system/DirectoryOption.h"
#include "indexlib/file_system/EntryTable.h"
#include "indexlib/file_system/ErrorCode.h"
#include "indexlib/file_system/FileInfo.h"
#include "indexlib/file_system/FileSystemDefine.h"
#include "indexlib/file_system/FileSystemOptions.h"
#include "indexlib/file_system/ListOption.h"
#include "indexlib/file_system/MergeDirsOption.h"
#include "indexlib/file_system/MountOption.h"
#include "indexlib/file_system/RemoveOption.h"
#include "indexlib/file_system/Storage.h"
#include "indexlib/file_system/StorageMetrics.h"
#include "indexlib/file_system/WriterOption.h"
#include "indexlib/file_system/file/FileReader.h"
#include "indexlib/file_system/file/ResourceFile.h"
#include "indexlib/file_system/fslib/FslibWrapper.h"
#include "indexlib/file_system/load_config/LoadConfigList.h"
#include "indexlib/file_system/package/DirectoryMerger.h"
#include "indexlib/util/Exception.h"
#include "indexlib/util/PathUtil.h"
using namespace std;
using namespace autil;
using namespace indexlib::util;
namespace indexlib { namespace file_system {
AUTIL_LOG_SETUP(indexlib.file_system, LogicalFileSystem);
LogicalFileSystem::LogicalFileSystem(const std::string& name, const std::string& outputRoot,
util::MetricProviderPtr metricProvider) noexcept
: _lock(new autil::RecursiveThreadMutex())
, _name(name)
, _outputRoot(PathUtil::NormalizePath(outputRoot))
, _entryTableBuilder(new EntryTableBuilder())
{
AUTIL_LOG(INFO, "Create LogicalFS [%s] [%p] in [%s]", _name.c_str(), this, _outputRoot.c_str());
_metricsReporter.reset(new FileSystemMetricsReporter(metricProvider));
}
LogicalFileSystem::~LogicalFileSystem() noexcept
{
AUTIL_LOG(INFO, "~LogicalFS [%s] [%p] in [%s]", _name.c_str(), this, _outputRoot.c_str());
}
FSResult<void> LogicalFileSystem::Init(const FileSystemOptions& fileSystemOptions) noexcept
{
AUTIL_LOG(INFO, "Begin init LogicalFS [%s] [%p] in [%s]", _name.c_str(), this, _outputRoot.c_str());
_options = std::make_shared<FileSystemOptions>(fileSystemOptions);
if (_options->useRootLink) {
_rootLinkPath = FILE_SYSTEM_ROOT_LINK_NAME;
if (_options->rootLinkWithTs) {
_rootLinkPath += "@" + StringUtil::toString(TimeUtility::currentTimeInSeconds());
}
}
ScopedLock lock(*_lock);
RETURN_IF_FS_ERROR(DoInit(), "");
AUTIL_LOG(INFO, "End init LogicalFS [%s] [%p] in [%s]", _name.c_str(), this, _outputRoot.c_str());
return FSEC_OK;
}
// If versionId<0, Mount will not take effect and all existing data will be erased.
FSResult<void> LogicalFileSystem::ReInit(int versionId) noexcept
{
AUTIL_LOG(INFO, "Begin re-init LogicalFS [%s] [%p] in [%s]", _name.c_str(), this, _outputRoot.c_str());
ScopedLock lock(*_lock);
_inputStorage.reset();
_outputStorage.reset();
_entryTable->Clear();
RETURN_IF_FS_ERROR(DoInit(), "");
RETURN_IF_FS_ERROR(MountVersion(_outputRoot, versionId, "", FSMT_READ_ONLY, nullptr), "");
AUTIL_LOG(INFO, "End re-init LogicalFS [%s] [%p] in [%s].", _name.c_str(), this, _outputRoot.c_str());
return FSEC_OK;
}
// Reset FS to a version if versionId>=0. If versionId<0, reset FS to a complete new state.
FSResult<void> LogicalFileSystem::TEST_Reset(int versionId) noexcept { return ReInit(versionId); }
FSResult<void> LogicalFileSystem::DoInit() noexcept
{
// ScopedLock lock(*_lock);
// may be the threadLogicalFs, will inherit the entryTable of main thread
if (!_entryTable) {
_entryTable = _entryTableBuilder->CreateEntryTable(_name, _outputRoot, _options);
assert(_outputRoot == _entryTable->GetOutputRoot());
}
RETURN_IF_FS_ERROR(InitStorage(), "");
_metricsReporter->DeclareMetrics(GetFileSystemMetrics(), _options->metricPref);
if (_options->isOffline) {
auto ec = FslibWrapper::MkDir(_outputRoot, /*recursive=*/true, /*mayExist=*/true).Code();
RETURN_IF_FS_ERROR(ec, "make root directory failed");
}
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::CommitSelectedFilesAndDir(
versionid_t versionId, const std::vector<std::string>& fileList, const std::vector<std::string>& dirList,
const std::vector<std::string>& filterDirList, const std::string& finalDumpFileName,
const std::string& finalDumpFileContent, FenceContext* fenceContext) noexcept
{
AUTIL_LOG(INFO, "Begin commit version [%d] LogicalFS [%s] [%p] in [%s]", versionId, _name.c_str(), this,
_outputRoot.c_str());
assert(finalDumpFileName.empty() || PathUtil::GetFileName(finalDumpFileName) == finalDumpFileName);
RETURN_IF_FS_ERROR(Sync(true).Code(), "commit version [%d] LogicalFS [%s] in [%s] falied", versionId, _name.c_str(),
_outputRoot.c_str());
// TODO: optimize. to complete the entry meta tree
for (const std::string& dir : dirList) {
FSResult<EntryMeta> entryMetaRet = GetEntryMeta(dir);
if (entryMetaRet.ec == FSEC_NOENT) {
continue;
}
RETURN_IF_FS_ERROR(entryMetaRet.ec, "CommitSelectedFilesAndDir on dir [%s] failed", dir.c_str());
const EntryMeta& entryMeta = entryMetaRet.result;
if (!entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "[%s] is not a dir!", dir.c_str());
return FSEC_NOTDIR;
}
if (entryMeta.IsLazy()) {
auto ec = _entryTableBuilder->MountDirRecursive(entryMeta.GetPhysicalRoot(), dir, dir,
entryMeta.IsOwner() ? FSMT_READ_WRITE : FSMT_READ_ONLY);
RETURN_IF_FS_ERROR(ec, "mount dir failed. root[%s] path[%s] to [%s]", entryMeta.GetPhysicalRoot().c_str(),
dir.c_str(), dir.c_str());
}
}
for (const std::string& file : fileList) {
FSResult<EntryMeta> entryMetaRet = GetEntryMeta(file);
if (entryMetaRet.ec == FSEC_NOENT) {
continue;
}
RETURN_IF_FS_ERROR(entryMetaRet.ec, "CommitSelectedFilesAndDir on file [%s] failed", file.c_str());
const EntryMeta& entryMeta = entryMetaRet.result;
if (entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "[%s] is a dir!", file.c_str());
return FSEC_ISDIR;
}
}
ScopedLock lock(*_lock);
if (!finalDumpFileName.empty()) {
auto [ec, entryMeta] = _entryTable->CreateFile(finalDumpFileName);
RETURN_IF_FS_ERROR(ec, "create final file [%s] failed", finalDumpFileName.c_str());
entryMeta.SetLength(finalDumpFileContent.size());
ec = _entryTable->AddEntryMeta(entryMeta).ec;
RETURN_IF_FS_ERROR(ec, "entry table add final dump file failed, [%s]", entryMeta.DebugString().c_str());
}
if (!_entryTable->Commit(versionId, fileList, dirList, filterDirList, fenceContext)) {
if (!finalDumpFileName.empty()) {
_entryTable->Delete(finalDumpFileName);
}
AUTIL_LOG(ERROR, "LogicalFS [%s] [%p] dump entry table failed", _name.c_str(), this);
return FSEC_ERROR;
}
if (!finalDumpFileName.empty()) {
string path = PathUtil::JoinPath(_outputRoot, finalDumpFileName);
auto ec = FslibWrapper::AtomicStore(path, finalDumpFileContent, true, fenceContext).Code();
if (ec != FSEC_OK) {
_entryTable->Delete(finalDumpFileName);
RETURN_IF_FS_ERROR(ec, "atomic store final dump file [%s] failed", path.c_str());
}
}
// Update files that are commited above from mutable to immutable etc. See Update for more
// details. This is necessary in order to:
// 1. Maintain in memory EntryTable state to be consistent with physical files/dirs, so that
// user can continue to e.g. read and write in current FS.
// 2. Files that are committed e.g. should not be mutable, so we need to update this property.
_entryTable->UpdateAfterCommit(fileList, dirList);
AUTIL_LOG(INFO, "End commit LogicalFS [%s] [%p] in [%s]", _name.c_str(), this, _outputRoot.c_str());
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::CommitPreloadDependence(FenceContext* fenceContext) noexcept
{
ScopedLock lock(*_lock);
auto ret = EntryTableBuilder::GetLastVersion(_outputRoot);
if (ret.ec != FSEC_OK && ret.ec != FSEC_NOENT) {
AUTIL_LOG(ERROR, "get last version failed!");
return FSEC_ERROR;
}
auto versionId = ret.result;
if (versionId != -1) {
AUTIL_LOG(
ERROR,
"LogicalFS [%s] [%p], other version file already exist! PreloadEntryTable must be the first to commit!",
_name.c_str(), this);
return FSEC_ERROR;
}
if (!_entryTable->CommitPreloadDependence(fenceContext)) {
AUTIL_LOG(ERROR, "LogicalFS [%s] [%p] dump preload entry table failed", _name.c_str(), this);
return FSEC_ERROR;
}
AUTIL_LOG(INFO, "End commit preloaddependence of LogicalFS [%s] [%p] in [%s]", _name.c_str(), this,
_outputRoot.c_str());
return FSEC_OK;
}
FSResult<void>
LogicalFileSystem::MountVersion(const std::string& rawPhysicalRoot, versionid_t versionId,
const std::string& rawLogicalPath, MountOption mountOption,
const std::shared_ptr<indexlib::file_system::LifecycleTable>& lifecycleTable) noexcept
{
AUTIL_LOG(DEBUG,
"MountVersion, physicalRoot[%s], versionId[%d], logicalPath[%s], mountType[%d], conflictResolution[%d]",
rawPhysicalRoot.c_str(), versionId, rawLogicalPath.c_str(), mountOption.mountType,
(int)mountOption.conflictResolution);
if (unlikely(mountOption.mountType == FSMT_READ_WRITE)) {
assert(false);
AUTIL_LOG(ERROR, "only support mount version READ_ONLY till now, failed mount [%s] version [%d] to [%s]",
rawPhysicalRoot.c_str(), versionId, rawLogicalPath.c_str());
return FSEC_NOTSUP;
}
string logicalPath = NormalizeLogicalPath(rawLogicalPath);
string physicalRoot = PathUtil::NormalizePath(rawPhysicalRoot);
if (versionId == INVALID_VERSIONID) {
auto ret = EntryTableBuilder::GetLastVersion(physicalRoot);
if (!ret.OK() && ret.ec != FSEC_NOENT) {
AUTIL_LOG(ERROR, "list last version failed, root[%s], ec[%d]", physicalRoot.c_str(), ret.ec);
return FSEC_ERROR;
}
versionId = ret.result;
}
ScopedLock lock(*_lock);
if (lifecycleTable != nullptr) {
AUTIL_LOG(INFO, "mount version[%d] with lifecycleTable[size=%zu]", versionId, lifecycleTable->Size());
for (auto it = lifecycleTable->Begin(); it != lifecycleTable->End(); it++) {
const auto& dirPath = it->first;
const auto& dirLifecycle = it->second;
if (!dirLifecycle.empty()) {
AUTIL_LOG(INFO, "add path [%s] lifecycle [%s] to lfs", dirPath.c_str(), dirLifecycle.c_str());
}
SetDirLifecycle(dirPath, dirLifecycle);
}
}
auto ec = _entryTableBuilder->MountVersion(physicalRoot, versionId, logicalPath, mountOption);
if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "mount version failed, physicalRoot [%s], vesion [%d], logicalPath [%s], ec[%d]",
physicalRoot.c_str(), versionId, logicalPath.c_str(), ec);
}
return ec;
}
FSResult<void> LogicalFileSystem::MountDir(const std::string& rawPhysicalRoot, const std::string& rawPhysicalPath,
const std::string& rawLogicalPath, MountOption mountOption,
bool enableLazyMount) noexcept
{
AUTIL_LOG(DEBUG, "MountDir, physicalRoot[%s], physicalPath[%s], logicalPath[%s], mountType[%d]",
rawPhysicalRoot.c_str(), rawPhysicalPath.c_str(), rawLogicalPath.c_str(), mountOption.mountType);
std::string physicalRoot = PathUtil::NormalizePath(rawPhysicalRoot);
std::string physicalPath = PathUtil::NormalizePath(rawPhysicalPath);
std::string logicalPath = NormalizeLogicalPath(rawLogicalPath);
auto mountType = mountOption.mountType;
if (PathUtil::JoinPath(physicalRoot, physicalPath) == _outputRoot &&
logicalPath.empty()) { // TODO:(qingran) tmp skip mount to root
assert(mountType != FSMT_READ_ONLY);
// if enableLazyMount is true, it's no need to do anything else since the outputRoot path is lazy loaded.
if (enableLazyMount) {
return FSEC_OK;
}
}
if (mountType == FSMT_READ_ONLY || !enableLazyMount) {
auto ec = _entryTableBuilder->MountDirRecursive(physicalRoot, physicalPath, logicalPath, mountOption);
RETURN_IF_FS_ERROR(ec, "mount dir root[%s]/[%s] to [%s] failed", rawPhysicalRoot.c_str(),
rawPhysicalPath.c_str(), rawLogicalPath.c_str());
return FSEC_OK;
}
if (mountOption.conflictResolution != ConflictResolution::OVERWRITE) {
// TODO: support lazy mount
AUTIL_LOG(ERROR, "lazy mount only suport ConflictResolution::OVERWRITE");
return FSEC_BADARGS;
}
auto ret = FslibWrapper::IsDir(PathUtil::JoinPath(physicalRoot, physicalPath));
RETURN_IF_FS_ERROR(ret.ec, "check dir exist failed when mount dir [%s]/[%s] to [%s]", physicalRoot.c_str(),
physicalPath.c_str(), logicalPath.c_str());
if (!ret.result) {
AUTIL_LOG(ERROR, "mount dir [%s]/[%s] to [%s] failed, physical path is not exist or not dir",
physicalRoot.c_str(), physicalPath.c_str(), logicalPath.c_str());
return FSEC_NOTDIR;
}
auto ec = _entryTableBuilder->MountDirLazy(physicalRoot, physicalPath, logicalPath, mountOption);
RETURN_IF_FS_ERROR(ec, "mount dir [%s]/[%s] to [%s] failed", physicalRoot.c_str(), physicalPath.c_str(),
logicalPath.c_str());
AUTIL_LOG(INFO, "mount root [%s]/[%s] to [%s] type[%d] successfully", rawPhysicalRoot.c_str(),
rawPhysicalPath.c_str(), rawLogicalPath.c_str(), mountType);
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::MountFile(const string& rawPhysicalRoot, const string& rawPhysicalPath,
const string& rawLogicalPath, FSMountType mountType, int64_t length,
bool mayNonExist) noexcept
{
AUTIL_LOG(
DEBUG,
"MountFile, physicalRoot[%s], physicalPath[%s], logicalPath[%s], mountType[%d], length[%ld], mayNonExist[%d]",
rawPhysicalRoot.c_str(), rawPhysicalPath.c_str(), rawLogicalPath.c_str(), mountType, length, mayNonExist);
auto physicalRoot = PathUtil::NormalizePath(rawPhysicalRoot);
auto physicalPath = PathUtil::NormalizePath(rawPhysicalPath);
auto logicalPath = NormalizeLogicalPath(rawLogicalPath);
ScopedLock lock(*_lock);
auto ec = _entryTableBuilder->MountFile(physicalRoot, physicalPath, logicalPath, length, mountType);
if (ec == FSEC_NOENT) {
if (mayNonExist) {
return FSEC_OK;
}
AUTIL_LOG(ERROR, "mount file failed. root[%s] path[%s] to [%s], not exist", rawPhysicalRoot.c_str(),
rawPhysicalPath.c_str(), rawLogicalPath.c_str());
} else if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "mount file failed. root[%s] path[%s] to [%s], ec[%d]", rawPhysicalRoot.c_str(),
rawPhysicalPath.c_str(), rawLogicalPath.c_str(), ec);
}
return ec;
}
FSResult<void> LogicalFileSystem::MountSegment(const std::string& rawLogicalPath) noexcept
{
auto path = NormalizeLogicalPath(rawLogicalPath);
AUTIL_LOG(INFO, "Mount segment [%s]", rawLogicalPath.c_str());
ScopedLock lock(*_lock);
FSResult<EntryMeta> entryMetaRet = GetEntryMeta(path);
RETURN_IF_FS_ERROR(entryMetaRet.ec, "Mount segment failed [%s]", rawLogicalPath.c_str());
const EntryMeta& entryMeta = entryMetaRet.result;
if (!entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "Mount segment failed, path [%s] is not dir", rawLogicalPath.c_str());
return FSEC_ERROR;
}
if (!entryMeta.IsLazy()) {
AUTIL_LOG(DEBUG, "Skip mount segment [%s] for it is not lazy", rawLogicalPath.c_str());
return FSEC_OK;
}
auto ec = _entryTableBuilder->MountSegment(entryMeta.GetPhysicalRoot(), entryMeta.GetPhysicalPath(),
entryMeta.GetLogicalPath(),
entryMeta.IsOwner() ? FSMT_READ_WRITE : FSMT_READ_ONLY);
RETURN_IF_FS_ERROR(ec, "Mount segment [%s] failed", rawLogicalPath.c_str());
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::CopyToOutputRoot(const std::string& logicalPath, bool mayNonExist) noexcept
{
auto ret = GetPhysicalPath(logicalPath);
if (!ret.OK()) {
if (ret.ec == FSEC_NOENT && mayNonExist) {
AUTIL_LOG(INFO, "Logical Path [%s] not exist!", logicalPath.c_str());
return FSEC_OK;
}
RETURN_IF_FS_ERROR(ret.ec, "GetPhysicalPath [%s] failed", logicalPath.c_str());
}
const std::string& physicalPath = ret.result;
std::string destPath = PathUtil::JoinPath(_outputRoot, logicalPath);
if (physicalPath == destPath) {
AUTIL_LOG(INFO, "Path [%s] already exist in OuputRoot [%s]", logicalPath.c_str(), _outputRoot.c_str());
return FSEC_OK;
}
auto ec = FslibWrapper::Copy(physicalPath, destPath).Code();
RETURN_IF_FS_ERROR(ec, "Path [%s] copy to OuputRoot failed [%s]", logicalPath.c_str(), _outputRoot.c_str());
auto [ec2, isDir] = IsDir(logicalPath);
RETURN_IF_FS_ERROR(ec2, "");
if (isDir) {
RETURN_IF_FS_ERROR(MountDir(_outputRoot, logicalPath, logicalPath, FSMT_READ_WRITE, true), "");
} else {
RETURN_IF_FS_ERROR(MountFile(_outputRoot, logicalPath, logicalPath, FSMT_READ_WRITE, -1, mayNonExist), "");
}
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::MergeDirs(const std::vector<std::string>& physicalDirs,
const std::string& logicalPath,
const MergeDirsOption& mergeDirsOption) noexcept
{
AUTIL_LOG(INFO, "merge dirs [%s] to [%s]", StringUtil::toString(physicalDirs).c_str(), logicalPath.c_str());
if (physicalDirs.empty()) {
return FSEC_OK;
}
auto [ec, dirEntryMeta] = _entryTable->GetEntryMetaMayLazy(logicalPath);
std::string destPhysicalRoot =
ec == FSEC_OK && !dirEntryMeta.IsMutable() ? _entryTable->GetPatchRoot() : _outputRoot;
std::string destPhyicalFullPath = PathUtil::JoinPath(destPhysicalRoot, logicalPath);
bool optimize = false;
if (physicalDirs.size() == 1) {
auto [ec, exist] = IsExist(logicalPath);
RETURN_IF_FS_ERROR(ec, "merge dirs failed, isexist [%s] failed", logicalPath.c_str());
optimize = !exist;
}
if (optimize) {
// optimize for end build
auto ec = FslibWrapper::Rename(physicalDirs[0], destPhyicalFullPath /*noFence*/).Code();
if (ec == FSEC_NOENT) {
AUTIL_LOG(WARN, "skip merge src dir [%s] for not exist", physicalDirs[0].c_str());
} else {
RETURN_IF_FS_ERROR(ec, "merge dirs failed, rename [%s] to [%s/%s]", physicalDirs[0].c_str(),
_outputRoot.c_str(), logicalPath.c_str());
}
} else {
// normal for end Merge
for (const std::string& physicalPath : physicalDirs) {
auto isDirRet = FslibWrapper::IsDir(physicalPath);
assert(isDirRet.Code() != FSEC_NOENT);
if (isDirRet.OK() && isDirRet.result) {
fslib::EntryList entryList;
auto ec = FslibWrapper::ListDir(physicalPath, entryList).Code();
RETURN_IF_FS_ERROR(ec, "Merge dirs failed, list src dir [%s] failed", physicalPath.c_str());
for (size_t i = 0; i < entryList.size(); ++i) {
entryList[i].entryName = PathUtil::JoinPath(physicalPath, entryList[i].entryName);
}
ec = DirectoryMerger::MoveFiles(entryList, destPhyicalFullPath);
RETURN_IF_FS_ERROR(ec, "merge dirs failed, move src dir [%s] to [%s]", physicalPath.c_str(),
destPhyicalFullPath.c_str());
continue;
}
RETURN_IF_FS_ERROR(isDirRet.Code(), "Merge dirs failed, check is dir [%s] failed", physicalPath.c_str());
auto isFileRet = FslibWrapper::IsFile(physicalPath);
assert(isFileRet.Code() != FSEC_NOENT);
if (isFileRet.OK() && isFileRet.result) {
auto ret = FslibWrapper::IsExist(physicalPath);
RETURN_IF_FS_ERROR(ret.ec, "merge dirs failed, is exist [%s]", physicalPath.c_str());
if (ret.result) {
auto ec = DirectoryMerger::MoveFiles({{false, physicalPath}}, destPhyicalFullPath);
RETURN_IF_FS_ERROR(ec, "merge dirs failed, move files [%s] to [%s]", physicalDirs[0].c_str(),
destPhyicalFullPath.c_str());
}
continue;
}
RETURN_IF_FS_ERROR(isFileRet.Code(), "Merge dirs failed, check is dir [%s] failed", physicalPath.c_str());
}
}
// mount merged dir and files
auto ret = FslibWrapper::IsDir(PathUtil::JoinPath(destPhysicalRoot, logicalPath));
RETURN_IF_FS_ERROR(ret.ec, "Merge dirs failed, isdir[%s/%s]", destPhysicalRoot.c_str(), logicalPath.c_str());
if (ret.result) {
RETURN_IF_FS_ERROR(MountDir(destPhysicalRoot, logicalPath, logicalPath, FSMT_READ_WRITE, true), "");
} else {
RETURN_IF_FS_ERROR(MountFile(destPhysicalRoot, logicalPath, logicalPath, FSMT_READ_WRITE, -1, false), "");
}
if (mergeDirsOption.mergePackageFiles) {
RETURN_IF_FS_ERROR(MergePackageFiles(logicalPath, mergeDirsOption.fenceContext), "");
}
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::MergePackageFiles(const std::string& logicalPath, FenceContext* fenceContext) noexcept
{
FSResult<EntryMeta> dirEntryMetaRet = _entryTable->GetEntryMeta(logicalPath);
const EntryMeta& dirEntryMeta = dirEntryMetaRet.result;
std::string destPhysicalRoot =
dirEntryMetaRet.ec == FSEC_OK && !dirEntryMeta.IsMutable() ? _entryTable->GetPatchRoot() : _outputRoot;
std::string destPhyicalFullPath = PathUtil::JoinPath(destPhysicalRoot, logicalPath);
FSResult<bool> mergePackageFilesRet = DirectoryMerger::MergePackageFiles(destPhyicalFullPath, fenceContext);
RETURN_IF_FS_ERROR(mergePackageFilesRet.ec, "merge package files in [%s] failed", destPhyicalFullPath.c_str());
auto [ec, exist] = IsExist(logicalPath);
RETURN_IF_FS_ERROR(ec, "merge package files in [%s] failed, isexist [%s] failed", destPhyicalFullPath.c_str(),
logicalPath.c_str());
if (exist) {
// remove useless moved package files in entryTable.
auto entryMetas = _entryTable->ListDir(logicalPath, true);
for (const auto& entryMeta : entryMetas) {
_entryTable->Delete(entryMeta);
}
// re mount
RETURN_IF_FS_ERROR(MountDir(destPhysicalRoot, logicalPath, logicalPath, FSMT_READ_WRITE, true), "");
}
if (mergePackageFilesRet.result) {
RETURN_IF_FS_ERROR(MakeDirectory(logicalPath, DirectoryOption()), "MakeDirectory [%s]", logicalPath.c_str());
auto ec = _entryTableBuilder->MountPackageDir(destPhysicalRoot, logicalPath, logicalPath, FSMT_READ_WRITE);
if (ec != FSEC_NOENT) {
RETURN_IF_FS_ERROR(ec, "merge package file in [%s] failed", destPhyicalFullPath.c_str());
}
}
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::InitStorage() noexcept
{
AUTIL_LOG(INFO, "init storage, outputRoot [%s], options [%s]", _outputRoot.c_str(),
_options->DebugString().c_str());
if (_options->memoryQuotaControllerV2) {
_memController.reset(new BlockMemoryQuotaController(_options->memoryQuotaControllerV2, "file_system"));
} else {
assert(_options->memoryQuotaController);
_memController.reset(new BlockMemoryQuotaController(_options->memoryQuotaController, "file_system"));
}
_inputStorage = Storage::CreateInputStorage(_options, _memController, _entryTable, _lock.get());
if (!_inputStorage) {
AUTIL_LOG(ERROR, "Create input storage failed! output root [%s]", _outputRoot.c_str());
return FSEC_ERROR;
}
_outputStorage =
Storage::CreateOutputStorage(_outputRoot, _options, _memController, _name, _entryTable, _lock.get());
if (!_outputStorage) {
AUTIL_LOG(ERROR, "Create output stoage failed, output root [%s]", _outputRoot.c_str());
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<EntryMeta> LogicalFileSystem::CreateEntryMeta(const string& path) noexcept
{
ScopedLock lock(*_lock);
auto [ec, isDir] = IsDir(PathUtil::GetParentDirPath(path));
RETURN2_IF_FS_ERROR(ec, EntryMeta(), "Create file writer failed, IsDir parent of path [%s] failed", path.c_str());
if (!isDir) {
RETURN2_IF_FS_ERROR(FSEC_ERROR, EntryMeta(),
"Create file writer failed, Parent dir for file [%s] not exist or is not dir",
path.c_str());
}
auto ret = _entryTable->CreateFile(path);
if (ret.Code() != FSEC_OK) {
AUTIL_LOG(ERROR, "Create file writer failed, file [%s], ec[%d]", path.c_str(), ret.Code());
return ret;
}
assert(ret.result.IsMutable());
return ret;
}
FSResult<std::shared_ptr<FileWriter>> LogicalFileSystem::CreateFileWriter(const string& rawPath,
const WriterOption& rawWriterOption) noexcept
{
AUTIL_LOG(DEBUG, "file[%s], openType[%d]", rawPath.c_str(), rawWriterOption.openType);
string path = NormalizeLogicalPath(rawPath);
WriterOption writerOption = rawWriterOption;
writerOption.metricGroup = GetMetricGroup(path);
if (writerOption.openType == FSOT_SLICE) {
return CreateSliceFileWriter(path, writerOption);
} else if (writerOption.openType == FSOT_MEM) {
return CreateMemFileWriter(path, writerOption);
}
ScopedLock lock(*_lock);
bool deleteWhenFail = false;
string fullPhysicalPath;
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec == FSEC_OK) {
fullPhysicalPath = entryMeta.GetFullPhysicalPath();
if (writerOption.openType == FSOT_MEM) {
AUTIL_LOG(INFO, "file [%s] already exist, remove it [%s] before CreateFileWriter!", path.c_str(),
fullPhysicalPath.c_str());
RETURN2_IF_FS_ERROR(RemoveFile(path, RemoveOption()), std::shared_ptr<FileWriter>(), "remove [%s] failed",
path.c_str());
} else if (!writerOption.isAppendMode) {
AUTIL_LOG(ERROR, "Create file [%s] => [%s] writer failed, already exist", rawPath.c_str(),
fullPhysicalPath.c_str());
return {FSEC_EXIST, std::shared_ptr<FileWriter>()};
}
} else if (ec == FSEC_NOENT) {
auto [ec1, entryMeta1] = CreateEntryMeta(path);
RETURN2_IF_FS_ERROR(ec1, std::shared_ptr<FileWriter>(), "CreateEntryMeta failed, path[%s]", path.c_str());
if (writerOption.noDump) { // noDump means pure memory file
entryMeta1.SetIsMemFile(true);
}
auto addRet = _entryTable->AddEntryMeta(entryMeta1);
RETURN2_IF_FS_ERROR(addRet.ec, std::shared_ptr<FileWriter>(), "entry table add file failed, [%s]",
entryMeta1.DebugString().c_str());
fullPhysicalPath = entryMeta1.GetFullPhysicalPath();
deleteWhenFail = true;
} else {
RETURN2_IF_FS_ERROR(ec, std::shared_ptr<FileWriter>(), "Create file [%s] writer failed", rawPath.c_str());
}
auto ret = _outputStorage->CreateFileWriter(path, fullPhysicalPath, writerOption);
if (ret.ec != FSEC_OK) {
AUTIL_LOG(ERROR, "Create file [%s] => [%s] writer failed", rawPath.c_str(), fullPhysicalPath.c_str());
if (deleteWhenFail) {
_entryTable->Delete(path);
}
}
return ret;
}
FSMetricGroup LogicalFileSystem::GetMetricGroup(const std::string& normPath) noexcept
{
for (const std::string& rootPath : _options->memMetricGroupPaths) {
if (PathUtil::IsInRootPath(normPath, rootPath)) {
return FSMG_MEM;
}
}
return FSMG_LOCAL;
}
FSResult<std::shared_ptr<FileReader>> LogicalFileSystem::CreateFileReader(const string& rawPath,
const ReaderOption& readerOption) noexcept
{
bool isLink = false;
auto path = NormalizeLogicalPath(rawPath, &isLink);
AUTIL_LOG(DEBUG, "file[%s], openType[%d], isLink[%d]", rawPath.c_str(), readerOption.openType, isLink);
if (readerOption.openType == FSOT_SLICE) {
return CreateSliceFileReader(path, readerOption);
}
std::shared_ptr<FileReader> reader;
{
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec != FSEC_OK) {
if (ec == FSEC_NOENT && readerOption.mayNonExist) {
AUTIL_LOG(DEBUG, "get entry meta failed [%s]", rawPath.c_str());
} else {
AUTIL_LOG(ERROR, "get entry meta failed [%s]", rawPath.c_str());
}
return {ec, std::shared_ptr<FileReader>()};
}
ReaderOption newReaderOption(readerOption);
newReaderOption.fileLength = entryMeta.GetLength();
newReaderOption.fileOffset = entryMeta.GetOffset();
newReaderOption.linkRoot = isLink ? _rootLinkPath : "";
auto storage = GetStorage(entryMeta, isLink);
std::string physicalPath = entryMeta.GetFullPhysicalPath();
if (readerOption.forceRemotePath) {
physicalPath = entryMeta.GetRawFullPhysicalPath();
}
// logical path should contain linkroot, for inside/outside cache & CompressFileAddressMapper check
auto [ec2, fileReader] = storage->CreateFileReader(/*logicalFilePath*/ PathUtil::NormalizePath(rawPath),
physicalPath, newReaderOption);
RETURN2_IF_FS_ERROR(ec2, std::shared_ptr<FileReader>(), "Open file [%s] for reading failed, address [%s]",
path.c_str(), entryMeta.GetFullPhysicalPath().c_str());
reader = fileReader;
}
assert(reader);
// avoid lock, Open need populate data will take much time
RETURN2_IF_FS_ERROR(reader->Open(), std::shared_ptr<FileReader>(), "reader open failed, file[%s]", rawPath.c_str());
reader->InitMetricReporter(GetFileSystemMetricsReporter());
return {FSEC_OK, reader};
}
FSResult<ResourceFilePtr> LogicalFileSystem::CreateResourceFile(const string& rawPath) noexcept
{
string path = NormalizeLogicalPath(rawPath);
AUTIL_LOG(DEBUG, "File[%s]", rawPath.c_str());
ScopedLock lock(*_lock);
auto [ec, file] = GetResourceFile(path);
if (ec == FSEC_OK && file != nullptr) {
return {FSEC_OK, file};
}
RETURN2_IF_FS_ERROR(ec, ResourceFilePtr(), "GetResourceFile [%s] failed", rawPath.c_str());
auto [ec2, entryMeta] = CreateEntryMeta(path);
RETURN2_IF_FS_ERROR(ec2, ResourceFilePtr(), "CreateEntryMeta failed, path[%s]", path.c_str());
entryMeta.SetIsMemFile(true);
bool addToEntryTable = !_options->isOffline;
auto ret = _outputStorage->CreateResourceFile(path, entryMeta.GetFullPhysicalPath(), GetMetricGroup(path));
RETURN2_IF_FS_ERROR(ret.ec, ResourceFilePtr(), "Create ResourceFile [%s] => [%s] failed", rawPath.c_str(),
entryMeta.GetFullPhysicalPath().c_str());
if (addToEntryTable) {
auto ec = _entryTable->AddEntryMeta(entryMeta).Code();
RETURN2_IF_FS_ERROR(ec, ResourceFilePtr(), "Entry table add InMemoryFile failed, [%s]",
entryMeta.DebugString().c_str());
}
if (ret.result) {
// Set Clean callback
ret.result->SetFileNodeCleanCallback([this, path] {
ScopedLock lock(*_lock);
_entryTable->Delete(path);
});
ret.result->UpdateFileLengthMetric(GetFileSystemMetricsReporter());
}
return {FSEC_OK, ret.result};
}
FSResult<ResourceFilePtr> LogicalFileSystem::GetResourceFile(const string& rawPath) const noexcept
{
string path = NormalizeLogicalPath(rawPath);
AUTIL_LOG(DEBUG, "File[%s]", rawPath.c_str());
ScopedLock lock(*_lock);
auto [ec, entryMeta] = _entryTable->GetEntryMeta(path);
if (ec != FSEC_OK) {
return {FSEC_OK, ResourceFilePtr()}; // for compitable
}
if (!entryMeta.IsMutable()) {
AUTIL_LOG(ERROR, "Unexpected mutable state, path:[%s].", rawPath.c_str());
return {FSEC_ERROR, ResourceFilePtr()};
}
auto [ec2, resourceFile] = _outputStorage->GetResourceFile(path);
RETURN2_IF_FS_ERROR(ec2, ResourceFilePtr(), "get ResourceFile [%s] failed", rawPath.c_str());
return {FSEC_OK, resourceFile};
}
FSResult<void> LogicalFileSystem::MakeDirectory(const std::string& rawPath,
const DirectoryOption& directoryOption) noexcept
{
string path = NormalizeLogicalPath(rawPath);
AUTIL_LOG(INFO, "Make dir [%s], recursive[%d]", rawPath.c_str(), directoryOption.recursive);
if (unlikely(path.empty())) {
return FSEC_OK;
}
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec == FSEC_OK) {
if (!directoryOption.recursive) {
AUTIL_LOG(ERROR, "Make dir [%s] failed, already exist", rawPath.c_str());
return FSEC_EXIST;
}
if (!entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "Make dir [%s] failed, is a file", rawPath.c_str());
return FSEC_EXIST;
}
ec = _outputStorage->MakeDirectory(path, entryMeta.GetFullPhysicalPath(), directoryOption.recursive,
directoryOption.packageHint);
RETURN_IF_FS_ERROR(ec, "Make dir [%s] failed, physical path [%s], recursive [%d]", path.c_str(),
entryMeta.GetFullPhysicalPath().c_str(), directoryOption.recursive);
return FSEC_OK;
} else if (ec != FSEC_NOENT) {
RETURN_IF_FS_ERROR(ec, "Make dir [%s] failed", rawPath.c_str());
}
std::vector<EntryMeta> metas;
// Allocate EntryMeta
ec = _entryTable->MakeDirectory(path, directoryOption.recursive, &metas);
if (ec == FSEC_EXIST) {
AUTIL_LOG(ERROR, "make dir [%s] failed, already exist", rawPath.c_str());
return FSEC_EXIST;
} else if (ec == FSEC_NOENT) {
AUTIL_LOG(ERROR, "make dir [%s] failed, parent dir not exist", rawPath.c_str());
return FSEC_NOENT;
} else if (unlikely(ec != FSEC_OK)) {
AUTIL_LOG(ERROR, "make dir [%s] failed, unexpected error [%d]", rawPath.c_str(), ec);
return FSEC_ERROR;
}
if (metas.size() > 0) {
// Make directory in physical storage
ec = _outputStorage->MakeDirectory(path, metas.back().GetFullPhysicalPath(), directoryOption.recursive,
directoryOption.packageHint);
RETURN_IF_FS_ERROR(ec, "Make dir [%s] failed, physical path [%s], recursive [%d]", path.c_str(),
metas.back().GetFullPhysicalPath().c_str(), directoryOption.recursive);
}
for (auto& meta : metas) {
ec = _entryTable->AddEntryMeta(meta).ec;
RETURN_IF_FS_ERROR(ec, "Make dir [%s] failed, entryMeta[%s]", path.c_str(), meta.DebugString().c_str());
}
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::RemoveFile(const string& rawPath, const RemoveOption& removeOption) noexcept
{
AUTIL_LOG(INFO, "Remove File[%s] in [%s] mayNonExist[%d]", rawPath.c_str(), _outputRoot.c_str(),
removeOption.mayNonExist);
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
if (unlikely(path.empty())) {
AUTIL_LOG(ERROR, "Remove root dir is forbidden");
return FSEC_BADARGS;
}
// make sure to remove the file in flushing
RETURN_IF_FS_ERROR(Sync(true).Code(), "sync [%s] true failed", rawPath.c_str());
std::unique_lock<autil::RecursiveThreadMutex> lock(*_lock);
if (removeOption.relaxedConsistency) {
lock.unlock();
}
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec == FSEC_NOENT) {
if (removeOption.mayNonExist) {
return FSEC_OK;
}
RETURN_IF_FS_ERROR(ec, "Get EntryMeta failed, path [%s]", rawPath.c_str());
}
if (entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "Remove file [%s] failed, is directory!", rawPath.c_str());
return FSEC_ISDIR;
}
string fullPhysicalPath = entryMeta.GetFullPhysicalPath();
if (!entryMeta.IsOwner()) {
AUTIL_LOG(DEBUG, "Remove file [%s] not owned.", path.c_str());
} else if (isLink) {
auto ec = _inputStorage->RemoveFile(path, fullPhysicalPath, removeOption.fenceContext);
if (unlikely(ec != FSEC_OK && ec != FSEC_NOENT)) {
RETURN_IF_FS_ERROR(ec, "Remove file [%s] => [%s] failed", rawPath.c_str(), fullPhysicalPath.c_str());
}
ec = _outputStorage->RemoveFile(path, fullPhysicalPath, removeOption.fenceContext);
if (unlikely(ec != FSEC_OK && ec != FSEC_NOENT)) {
RETURN_IF_FS_ERROR(ec, "Remove file [%s] => [%s] failed", rawPath.c_str(), fullPhysicalPath.c_str());
}
} else {
auto storage = GetStorage(entryMeta, isLink);
// meta for slice file will be released after storage call RemoveFile and becomes invalid.
auto ec = storage->RemoveFile(path, fullPhysicalPath, removeOption.fenceContext);
if (ec != FSEC_OK && ec != FSEC_NOENT) {
RETURN_IF_FS_ERROR(ec, "Remove file [%s] => [%s] failed", rawPath.c_str(), fullPhysicalPath.c_str());
}
}
_entryTable->Delete(path);
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::RemoveDirectory(const std::string& rawPath, const RemoveOption& removeOption) noexcept
{
AUTIL_LOG(INFO, "Remove directory [%s] in [%s], mayNonExist[%d]", rawPath.c_str(), _outputRoot.c_str(),
removeOption.mayNonExist);
string path = NormalizeLogicalPath(rawPath);
if (unlikely(path.empty())) {
AUTIL_LOG(ERROR, "Remove root dir is forbidden");
return FSEC_BADARGS;
}
// make sure to remove the file in flushing
RETURN_IF_FS_ERROR(Sync(true).Code(), "sync [%s] true failed", rawPath.c_str());
std::unique_lock<autil::RecursiveThreadMutex> lock(*_lock);
auto [ec, removeRootMeta] = GetEntryMeta(path);
if (ec == FSEC_NOENT && removeOption.mayNonExist) {
return FSEC_OK;
}
RETURN_IF_FS_ERROR(ec, "Get EntryMeta failed, path [%s]", rawPath.c_str());
if (!removeRootMeta.IsDir()) {
AUTIL_LOG(ERROR, "Remove directory [%s] failed. not a directory", rawPath.c_str());
return FSEC_NOTDIR;
}
auto entryMetas = _entryTable->ListDir(path, true);
entryMetas.emplace(entryMetas.begin(), removeRootMeta);
string removeRootPhysicalPath = removeRootMeta.GetPhysicalPath();
std::unordered_set<string> removeRoots;
for (const auto& entryMeta : entryMetas) {
if (entryMeta.IsOwner() && !entryMeta.IsInPackage()) {
removeRoots.insert(entryMeta.GetPhysicalRoot());
} else {
// inherit from other builder or merger
AUTIL_LOG(DEBUG, "Remove directory [%s] which is not owner", entryMeta.GetFullPhysicalPath().c_str());
}
// TODO: maybe we can optimize
_entryTable->Delete(entryMeta);
}
ec = _entryTable->OptimizeMemoryStructures();
RETURN_IF_FS_ERROR(ec, "Remove directory [%s] failed", path.c_str());
if (removeOption.relaxedConsistency) {
lock.unlock();
}
if (removeOption.logicalDelete) {
AUTIL_LOG(INFO, "remove directory [%s] logical, not remove physical path", rawPath.c_str());
return FSEC_OK;
}
for (const string& removeRoot : removeRoots) {
auto removePath = PathUtil::JoinPath(removeRoot, removeRootPhysicalPath);
auto ec = _outputStorage->RemoveDirectory(path, removePath, removeOption.fenceContext);
if (ec == FSEC_OK || (ec == FSEC_NOENT && removeOption.mayNonExist)) {
continue;
}
RETURN_IF_FS_ERROR(ec, "Remove directory [%s] => [%s] failed", path.c_str(), removePath.c_str());
}
return FSEC_OK;
}
FSResult<bool> LogicalFileSystem::IsExist(const string& rawPath) const noexcept
{
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
if (path.empty()) {
return {FSEC_OK, true};
}
if (isLink) {
auto ret = FslibWrapper::IsExist(PathUtil::JoinPath(_outputRoot, path));
if (ret.ec != FSEC_OK) {
AUTIL_LOG(ERROR, "check file IsExist [%s] failed, ec [%d]", PathUtil::JoinPath(_outputRoot, path).c_str(),
ret.ec);
return {ret.ec, false};
}
if (!ret.result) {
return {FSEC_OK, false};
}
}
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec == FSEC_OK) {
return {FSEC_OK, true};
} else if (ec != FSEC_NOENT) {
AUTIL_LOG(ERROR, "get entry meta [%s] failed, ec [%d]", rawPath.c_str(), ec);
return {FSEC_ERROR, false};
}
return {FSEC_OK, false};
}
FSResult<bool> LogicalFileSystem::IsDir(const string& rawPath) const noexcept
{
string path = NormalizeLogicalPath(rawPath);
if (path.empty()) {
return {FSEC_OK, true};
}
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec == FSEC_OK) {
return {FSEC_OK, entryMeta.IsDir()};
} else if (ec == FSEC_NOENT) {
return {FSEC_OK, false};
}
AUTIL_LOG(ERROR, "Get entry meta [%s] failed", rawPath.c_str());
return {ec, false};
}
FSResult<void> LogicalFileSystem::ListDir(const std::string& rawPath, const ListOption& listOption,
std::vector<std::string>& fileList) const noexcept
{
AUTIL_LOG(DEBUG, "Dir[%s], recursive[%d]", rawPath.c_str(), listOption.recursive);
string path = NormalizeLogicalPath(rawPath);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN_IF_FS_ERROR(ec, "List dir [%s] failed", rawPath.c_str());
if (!entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "List dir [%s] failed, not a dir!", rawPath.c_str());
return FSEC_NOTDIR;
}
if (entryMeta.IsLazy()) {
if (!listOption.recursive) {
auto ec = FslibWrapper::ListDir(entryMeta.GetFullPhysicalPath(), fileList).Code();
RETURN_IF_FS_ERROR(ec, "List dir [%s] failed", entryMeta.GetFullPhysicalPath().c_str());
FileList::iterator iter = std::remove_if(fileList.begin(), fileList.end(), FslibWrapper::IsTempFile);
fileList.erase(iter, fileList.end());
// we do not want to get length to complete the entry table
} else {
auto ec = _entryTableBuilder->MountDirRecursive(entryMeta.GetPhysicalRoot(), path, path,
entryMeta.IsOwner() ? FSMT_READ_WRITE : FSMT_READ_ONLY);
if (ec != FSEC_NOENT) {
RETURN_IF_FS_ERROR(ec, "Mount dir [%s] failed", entryMeta.GetFullPhysicalPath().c_str());
}
}
}
auto metas = _entryTable->ListDir(path, listOption.recursive);
for (const auto& meta : metas) {
auto logicalPath = meta.GetLogicalPath();
auto pos = logicalPath.rfind('.');
if (pos == string::npos || logicalPath.substr(pos) != TEMP_FILE_SUFFIX) {
if (!path.empty()) {
logicalPath = logicalPath.substr(path.size() + 1);
}
if (listOption.recursive && meta.IsDir()) {
logicalPath = PathUtil::NormalizeDir(logicalPath);
}
fileList.emplace_back(logicalPath);
}
}
if (entryMeta.IsLazy() && !listOption.recursive) {
std::sort(fileList.begin(), fileList.end());
auto iter = std::unique(fileList.begin(), fileList.end());
fileList.erase(iter, fileList.end());
}
return FSEC_OK;
}
FSResult<void> LogicalFileSystem::ListDir(const std::string& rawPath, const ListOption& listOption,
std::vector<FileInfo>& fileInfos) const noexcept
{
AUTIL_LOG(DEBUG, "Dir[%s], recursive[%d]", rawPath.c_str(), listOption.recursive);
string path = NormalizeLogicalPath(rawPath);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN_IF_FS_ERROR(ec, "List dir [%s] failed", rawPath.c_str());
if (!entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "List dir [%s] failed, not a dir!", rawPath.c_str());
return FSEC_NOTDIR;
}
if (entryMeta.IsLazy()) {
auto ec = _entryTableBuilder->MountDirRecursive(entryMeta.GetPhysicalRoot(), path, path,
entryMeta.IsOwner() ? FSMT_READ_WRITE : FSMT_READ_ONLY);
if (ec != FSEC_NOENT) {
RETURN_IF_FS_ERROR(ec, "mount dir failed. root[%s] path[%s] to [%s]", entryMeta.GetPhysicalRoot().c_str(),
path.c_str(), path.c_str());
}
}
auto metas = _entryTable->ListDir(path, listOption.recursive);
fileInfos.reserve(metas.size());
for (const auto& meta : metas) {
auto logicalPath = meta.GetLogicalPath();
auto pos = logicalPath.rfind('.');
if (pos == string::npos || logicalPath.substr(pos) != TEMP_FILE_SUFFIX) {
FileInfo info;
if (!path.empty()) {
logicalPath = logicalPath.substr(path.size() + 1);
}
if (listOption.recursive && meta.IsDir()) {
logicalPath = PathUtil::NormalizeDir(logicalPath);
}
info.filePath = logicalPath;
if (meta.IsFile()) {
info.fileLength = meta.GetLength();
}
fileInfos.emplace_back(info);
}
}
return FSEC_OK;
}
// ListPhysicalFile should get all physical files of rawPath from entry-table directly, instead of
// using e.g. entry-meta to get package data then find corresponding package meta name.
FSResult<void> LogicalFileSystem::ListPhysicalFile(const std::string& rawPath, const ListOption& listOption,
std::vector<FileInfo>& fileInfos) noexcept
{
AUTIL_LOG(DEBUG, "Dir[%s], recursive[%d]", rawPath.c_str(), listOption.recursive);
string path = NormalizeLogicalPath(rawPath);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN_IF_FS_ERROR(ec, "List dir [%s] failed", rawPath.c_str());
if (!entryMeta.IsDir()) {
AUTIL_LOG(ERROR, "List dir [%s] failed, not a dir!", rawPath.c_str());
return FSEC_NOTDIR;
}
ec = _entryTableBuilder->MountDirRecursive(entryMeta.GetPhysicalRoot(), path, path,
entryMeta.IsOwner() ? FSMT_READ_WRITE : FSMT_READ_ONLY);
if (ec != FSEC_NOENT) {
RETURN_IF_FS_ERROR(ec, "mount dir failed. root[%s] path[%s] to [%s]", entryMeta.GetPhysicalRoot().c_str(),
path.c_str(), path.c_str());
}
std::unordered_set<std::string> filter;
auto metas = _entryTable->ListDir(path, listOption.recursive);
fileInfos.reserve(metas.size());
for (const auto& meta : metas) {
if (meta.IsMemFile()) {
continue;
}
auto physicalPath = meta.GetPhysicalPath();
auto pos = physicalPath.rfind('.');
if (pos == string::npos || physicalPath.substr(pos) != TEMP_FILE_SUFFIX) {
if (!path.empty()) {
physicalPath = physicalPath.substr(path.size() + 1);
}
if (meta.IsInPackage() && filter.find(physicalPath) == filter.end()) {
filter.insert(physicalPath);
auto [ec, packDataFileLen] = _entryTable->GetPackageFileLength(meta.GetRawFullPhysicalPath());
RETURN_IF_FS_ERROR(ec, "GetPackageFileLength [%s] failed", meta.GetRawFullPhysicalPath().c_str());
fileInfos.emplace_back(physicalPath, packDataFileLen);
// dir is skipped because dir's physical path points to package meta file path.
if (meta.IsDir()) {
continue;
}
auto packMetaPath = GetPackageMetaFilePath(physicalPath);
if (filter.find(packMetaPath) == filter.end()) {
filter.insert(packMetaPath);
std::string packMetaFilePath =
PathUtil::JoinPath(meta.GetRawPhysicalRoot(), GetPackageMetaFilePath(meta.GetPhysicalPath()));
auto [ec, packMetaFileLen] = _entryTable->GetPackageFileLength(packMetaFilePath);
RETURN_IF_FS_ERROR(ec, "GetPackageFileLength [%s] failed", packMetaFilePath.c_str());
fileInfos.emplace_back(packMetaPath, packMetaFileLen);
}
} else if (!meta.IsInPackage()) {
if (listOption.recursive && meta.IsDir()) {
physicalPath = PathUtil::NormalizeDir(physicalPath);
}
fileInfos.emplace_back(physicalPath, meta.IsDir() ? -1 : meta.GetLength());
}
}
}
return FSEC_OK;
}
FSResult<size_t> LogicalFileSystem::GetFileLength(const std::string& rawPath) const noexcept
{
string path = NormalizeLogicalPath(rawPath);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN2_IF_FS_ERROR(ec, 0, "Get file [%s] length failed, get entry meta failed", rawPath.c_str());
if (entryMeta.IsDir()) {
return {FSEC_OK, 0};
}
assert(entryMeta.GetLength() >= 0);
return {FSEC_OK, (size_t)entryMeta.GetLength()};
}
FSResult<void> LogicalFileSystem::TEST_GetFileStat(const std::string& rawPath, FileStat& fileStat) const noexcept
{
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
ScopedLock lock(*_lock);
FSResult<EntryMeta> entryMetaRet = GetEntryMeta(path);
RETURN_IF_FS_ERROR(entryMetaRet.ec, "get file [%s] entry meta failed", rawPath.c_str());
const EntryMeta& entryMeta = entryMetaRet.result;
if (unlikely(entryMeta.IsDir())) {
assert(false);
fileStat.isDirectory = true;
} else {
fileStat.isDirectory = false;
auto [ec, fileLength] = GetFileLength(path);
RETURN_IF_FS_ERROR(ec, "GetFileLength [%s] failed", path.c_str());
fileStat.fileLength = fileLength;
}
fileStat.inPackage = entryMeta.IsInPackage();
fileStat.offset = entryMeta.GetOffset();
fileStat.physicalRoot = entryMeta.GetPhysicalRoot();
fileStat.physicalPath = entryMeta.GetPhysicalPath();
auto storage = GetStorage(entryMeta, isLink);
storage->TEST_GetFileStat(PathUtil::NormalizePath(rawPath), fileStat);
return FSEC_OK;
}
FSResult<std::shared_future<bool>> LogicalFileSystem::Sync(bool waitFinish) noexcept
{
ScopedLock lock(_syncLock);
if (_future.valid()) {
_future.wait();
}
std::future<bool> retFuture;
{
// TODO(xingwo) fix dead lock
ScopedLock innerLock(*_lock);
auto [ec, future] = _outputStorage->Sync();
RETURN2_IF_FS_ERROR(ec, std::move(future), "Sync failed");
retFuture = std::move(future);
}
if (waitFinish) {
// Attention: future.wait() not equal to MemStorage.WaitDumpQueueEmpty(), the latter will rethrow exceptions
// occures in async flush thread, but future.wait() will not
auto ec = _outputStorage->WaitSyncFinish();
if (ec != FSEC_OK) {
RETURN2_IF_FS_ERROR(ec, retFuture.share(), "WaitSyncFinish failed");
}
}
_future = retFuture.share();
return {FSEC_OK, _future};
}
FSResult<void> LogicalFileSystem::FlushPackage(const string& rawLogicalDirPath) noexcept
{
if (_options->outputStorage != FSST_PACKAGE_MEM) {
return FSEC_OK;
}
bool isLink = false;
auto logicalDirPath = NormalizeLogicalPath(rawLogicalDirPath, &isLink);
assert(!isLink);
ScopedLock lock(*_lock);
return _outputStorage->FlushPackage(logicalDirPath);
}
FSResult<int32_t> LogicalFileSystem::CommitPackage() noexcept
{
assert(_options->outputStorage == FSST_PACKAGE_DISK);
ScopedLock lock(*_lock);
return _outputStorage->CommitPackage();
}
FSResult<void> LogicalFileSystem::RecoverPackage(int32_t checkpoint, const std::string& rawLogicalDirPath,
const std::vector<std::string>& physicalFileListHint) noexcept
{
assert(_options->outputStorage == FSST_PACKAGE_DISK);
std::string logicalDirPath = NormalizeLogicalPath(rawLogicalDirPath);
AUTIL_LOG(INFO, "Begin recover dir [%s] with [%d] in [%s]", logicalDirPath.c_str(), checkpoint,
_outputRoot.c_str());
assert(GetEntryMeta(logicalDirPath).result.GetPhysicalPath() ==
GetEntryMeta(logicalDirPath).result.GetLogicalPath());
auto ec = _outputStorage->RecoverPackage(checkpoint, logicalDirPath, physicalFileListHint);
RETURN_IF_FS_ERROR(ec, "Recover [%s] with checkpoint [%d] failed, [%lu] files", logicalDirPath.c_str(), checkpoint,
physicalFileListHint.size());
AUTIL_LOG(INFO, "End recover dir [%s] with [%d] in [%s]", logicalDirPath.c_str(), checkpoint, _outputRoot.c_str());
return FSEC_OK;
}
FileSystemMetrics LogicalFileSystem::GetFileSystemMetrics() const noexcept
{
ScopedLock lock(*_lock);
const StorageMetrics& inputStorageMetrics = _inputStorage->GetMetrics();
const StorageMetrics& outputStorageMetrics = _outputStorage->GetMetrics();
return FileSystemMetrics(inputStorageMetrics, outputStorageMetrics);
}
void LogicalFileSystem::ReportMetrics() noexcept
{
ScopedLock lock(*_lock);
_metricsReporter->ReportMetrics(GetFileSystemMetrics());
if (_memController) {
_metricsReporter->ReportMemoryQuotaUse(_memController->GetUsedQuota());
}
}
bool LogicalFileSystem::IsExistInCache(const std::string& rawPath) const noexcept
{
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
ScopedLock lock(*_lock);
FSResult<EntryMeta> entryMetaRet = _entryTable->GetEntryMeta(path);
if (entryMetaRet.ec != FSEC_OK) {
return false;
}
auto storage = GetStorage(entryMetaRet.result, isLink);
return storage->IsExistInCache(PathUtil::NormalizePath(rawPath));
}
FSResult<FSFileType> LogicalFileSystem::DeduceFileType(const std::string& rawPath, FSOpenType openType) noexcept
{
AUTIL_LOG(DEBUG, "File[%s], OpenType[%d]", rawPath.c_str(), openType);
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN2_IF_FS_ERROR(ec, FSFT_UNKNOWN, "get file [%s] entry meta failed", rawPath.c_str());
if (!entryMeta.IsFile()) {
AUTIL_LOG(ERROR, "Path [%s] is not regular file!", rawPath.c_str());
return {FSEC_ERROR, FSFT_UNKNOWN};
}
auto storage = GetStorage(entryMeta, isLink);
return storage->DeduceFileType(PathUtil::NormalizePath(rawPath), entryMeta.GetFullPhysicalPath(), openType);
}
FSResult<size_t> LogicalFileSystem::EstimateFileLockMemoryUse(const string& rawPath, FSOpenType openType) noexcept
{
AUTIL_LOG(DEBUG, "File[%s], OpenType[%d]", rawPath.c_str(), openType);
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN2_IF_FS_ERROR(ec, 0ul, "get file [%s] entry meta failed", rawPath.c_str());
if (!entryMeta.IsFile()) {
AUTIL_LOG(ERROR, "Path [%s] is not regular file!", rawPath.c_str());
return {FSEC_ERROR, 0ul};
}
auto storage = GetStorage(entryMeta, isLink);
// logical path should contain linkroot, for inside/outside cache & CompressFileAddressMapper check
return storage->EstimateFileLockMemoryUse(PathUtil::NormalizePath(rawPath), entryMeta.GetFullPhysicalPath(),
openType, entryMeta.GetLength());
}
FSResult<size_t> LogicalFileSystem::EstimateFileMemoryUseChange(const string& rawPath,
const std::string& oldTemperature,
const std::string& newTemperature) noexcept
{
AUTIL_LOG(DEBUG, "File[%s], oldTemperature[%s], newTemperature[%s]", rawPath.c_str(), oldTemperature.c_str(),
newTemperature.c_str());
bool isLink = false;
string path = NormalizeLogicalPath(rawPath, &isLink);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
RETURN2_IF_FS_ERROR(ec, 0ul, "get file [%s] entry meta failed", rawPath.c_str());
if (!entryMeta.IsFile()) {
AUTIL_LOG(ERROR, "Path [%s] is not regular file!", rawPath.c_str());
return {FSEC_ERROR, 0ul};
}
auto storage = GetStorage(entryMeta, isLink);
// logical path should contain linkroot, for inside/outside cache & CompressFileAddressMapper check
return storage->EstimateFileMemoryUseChange(PathUtil::NormalizePath(rawPath), entryMeta.GetFullPhysicalPath(),
oldTemperature, newTemperature, entryMeta.GetLength());
}
void LogicalFileSystem::SwitchLoadSpeedLimit(bool on) noexcept { _options->loadConfigList.SwitchLoadSpeedLimit(on); }
bool LogicalFileSystem::SetDirLifecycle(const string& rawPath, const string& lifecycle) noexcept
{
auto path = NormalizeLogicalPath(rawPath);
ScopedLock lock(*_lock);
auto r1 = _inputStorage->SetDirLifecycle(path, lifecycle);
auto r2 = _outputStorage->SetDirLifecycle(path, lifecycle);
auto r3 = _entryTableBuilder->SetDirLifecycle(path, lifecycle);
return r1 && r2 && (r3 == FSEC_OK);
}
FSResult<FSStorageType> LogicalFileSystem::GetStorageType(const std::string& rawPath) const noexcept
{
bool isLink = false;
auto path = NormalizeLogicalPath(rawPath, &isLink);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = GetEntryMeta(path);
if (ec == FSEC_NOENT) {
return {FSEC_OK, FSST_UNKNOWN};
} else if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "get file [%s] entry meta failed", rawPath.c_str());
return {ec, FSST_UNKNOWN};
}
auto storage = GetStorage(entryMeta, isLink);
return {FSEC_OK, storage->GetStorageType()};
}
FSResult<void> LogicalFileSystem::Validate(const string& rawPath) const noexcept
{
// TODO: support validate package file
string path = NormalizeLogicalPath(rawPath);
ScopedLock lock(*_lock);
FSResult<EntryMeta> entryMetaRet = _entryTable->GetEntryMeta(path);
if (entryMetaRet.ec != FSEC_OK) {
AUTIL_LOG(ERROR, "validate [%s] failed, not existed in entry table", rawPath.c_str());
return FSEC_ERROR;
}
const EntryMeta& entryMeta = entryMetaRet.result;
auto [ec, pathMeta] = FslibWrapper::GetPathMeta(entryMeta.GetFullPhysicalPath());
if (ec == FSEC_NOENT) {
AUTIL_LOG(ERROR, "validate [%s] failed, not exist on disk [%s]", rawPath.c_str(),
entryMeta.DebugString().c_str());
return FSEC_NOENT;
} else if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "validate [%s] failed, get path meta error [%d]", rawPath.c_str(), ec);
return FSEC_ERROR;
}
if (entryMeta.IsDir() != (!pathMeta.isFile)) {
AUTIL_LOG(ERROR, "validate [%s] failed, on disk isDir[%d], in entry table [%s]", rawPath.c_str(),
!pathMeta.isFile, entryMeta.DebugString().c_str());
return FSEC_ERROR;
}
if (entryMeta.IsFile() && !entryMeta.IsInPackage() && entryMeta.GetLength() != pathMeta.length) {
AUTIL_LOG(ERROR, "validate [%s] failed, on disk length[%ld], in entry table [%s]", rawPath.c_str(),
pathMeta.length, entryMeta.DebugString().c_str());
return FSEC_ERROR;
}
return FSEC_OK;
}
void LogicalFileSystem::CleanCache() noexcept
{
ScopedLock lock(*_lock);
_inputStorage->CleanCache();
_outputStorage->CleanCache();
}
FSResult<void> LogicalFileSystem::Rename(const std::string& rawSrcPath, const std::string& rawDestPath,
FenceContext* fenceContext) noexcept
{
// TODO: add ut & support recover
AUTIL_LOG(INFO, "rename [%s] => [%s]", rawSrcPath.c_str(), rawDestPath.c_str());
auto srcPath = NormalizeLogicalPath(rawSrcPath);
auto destPath = NormalizeLogicalPath(rawDestPath);
ScopedLock lock(*_lock);
FSResult<EntryMeta> ret = GetEntryMeta(srcPath);
RETURN_IF_FS_ERROR(ret.ec, "get src file [%s] entry meta failed", rawSrcPath.c_str());
auto ec = _entryTable->Rename(srcPath, destPath, fenceContext);
RETURN_IF_FS_ERROR(ec, "Rename failed, [%s] => [%s]", rawSrcPath.c_str(), rawDestPath.c_str());
return FSEC_OK;
}
void LogicalFileSystem::TEST_SetUseRootLink(bool useRootLink) noexcept
{
_options->useRootLink = useRootLink;
_rootLinkPath.clear();
if (_options->useRootLink) {
_rootLinkPath = FILE_SYSTEM_ROOT_LINK_NAME;
if (_options->rootLinkWithTs) {
_rootLinkPath += "@" + StringUtil::toString(TimeUtility::currentTimeInSeconds());
}
}
}
void LogicalFileSystem::TEST_MountLastVersion() noexcept(false)
{
static const string versionFileNamePrefix = "version.";
static const string entryTableFileNamePrefix = "entry_table.";
FileList fileList;
auto ec = FslibWrapper::ListDir(_outputRoot, fileList).Code();
THROW_IF_FS_ERROR(ec, "ListDir in TEST_MountLastVersion failed: %s", _outputRoot.c_str());
// TODO: rm
int32_t lastVersionId = -1;
for (const string& fileName : fileList) {
versionid_t versionId = 0;
if (autil::StringUtil::startsWith(fileName, versionFileNamePrefix) &&
autil::StringUtil::fromString((string)(fileName.substr(versionFileNamePrefix.size())), versionId)) {
lastVersionId = std::max(lastVersionId, versionId);
}
}
if (lastVersionId < 0) {
for (const string& fileName : fileList) {
versionid_t versionId = 0;
if (autil::StringUtil::startsWith(fileName, entryTableFileNamePrefix) &&
autil::StringUtil::fromString((string)(fileName.substr(entryTableFileNamePrefix.size())), versionId)) {
lastVersionId = std::max(lastVersionId, versionId);
}
}
}
THROW_IF_FS_ERROR(MountVersion(_outputRoot, lastVersionId, "", FSMT_READ_ONLY, nullptr), "mount version failed");
}
std::string LogicalFileSystem::NormalizeLogicalPath(const std::string& rawPath, bool* isLink) const noexcept
{
auto path = PathUtil::NormalizePath(rawPath);
// logical fs should not get absolute path
assert(unlikely(path[0] != '/' && path.find("://") == string::npos));
// TODO: need a more effective path format, such as link=ts@LOGICAL_PATH
// Trim link path prefix
if (!_rootLinkPath.empty() && path.find(_rootLinkPath) == 0) {
if (path.size() == _rootLinkPath.size()) {
path.clear();
} else {
path = path.substr(_rootLinkPath.size() + 1);
}
if (isLink) {
*isLink = true;
}
}
return path;
}
util::MemoryReserverPtr LogicalFileSystem::CreateMemoryReserver(const std::string& name) noexcept
{
return util::MemoryReserverPtr(new util::MemoryReserver(name, _memController));
}
FSResult<string> LogicalFileSystem::GetPhysicalPath(const std::string& rawLogicalPath) const noexcept
{
std::string logicalPath = NormalizeLogicalPath(rawLogicalPath);
ScopedLock lock(*_lock);
FSResult<EntryMeta> entryMetaRet = GetEntryMeta(logicalPath);
if (!entryMetaRet.OK()) {
assert(entryMetaRet.ec == FSEC_NOENT || entryMetaRet.ec == FSEC_ERROR);
return {entryMetaRet.ec, ""};
}
const EntryMeta& entryMeta = entryMetaRet.result;
return {FSEC_OK, entryMeta.GetFullPhysicalPath()};
}
FSResult<void> LogicalFileSystem::TEST_Recover(int32_t checkpoint, const std::string& rawLogicalDirPath) noexcept
{
std::string logicalDirPath = NormalizeLogicalPath(rawLogicalDirPath);
std::vector<std::string> physicalFileList;
FSResult<EntryMeta> entryMetaRet = GetEntryMeta(logicalDirPath);
if (entryMetaRet.OK()) {
auto ec = FslibWrapper::ListDir(entryMetaRet.result.GetFullPhysicalPath(), physicalFileList).Code();
RETURN_IF_FS_ERROR(ec, "List dir [%s] => [%s] failed", logicalDirPath.c_str(),
entryMetaRet.result.GetFullPhysicalPath().c_str());
}
return RecoverPackage(checkpoint, rawLogicalDirPath, physicalFileList);
}
FSResult<void> LogicalFileSystem::TEST_Commit(int versionId, const std::string& finalDumpFileName,
const std::string& finalDumpFileContent) noexcept
{
return CommitSelectedFilesAndDir(versionId, {}, {}, {}, finalDumpFileName, finalDumpFileContent,
FenceContext::NoFence());
};
bool LogicalFileSystem::TEST_GetPhysicalInfo(const std::string& logicalPath, std::string& physicalRoot,
std::string& relativePath, bool& inPackage, bool& isDir) const noexcept
{
ScopedLock lock(*_lock);
const std::string path = NormalizeLogicalPath(logicalPath);
return _entryTable->TEST_GetPhysicalInfo(path, physicalRoot, relativePath, inPackage, isDir);
}
void LogicalFileSystem::SetDefaultRootPath(const std::string& defaultLocalPath,
const std::string& defaultRemotePath) noexcept
{
AUTIL_LOG(INFO, "fs root: local[%s], remote[%s]", defaultLocalPath.c_str(), defaultRemotePath.c_str());
ScopedLock lock(*_lock);
_options->loadConfigList.SetDefaultRootPath(PathUtil::NormalizePath(defaultLocalPath),
PathUtil::NormalizePath(defaultRemotePath));
}
std::string LogicalFileSystem::DebugString() const noexcept { return _name + ":" + _outputRoot; }
std::string LogicalFileSystem::GetPackageMetaFilePath(const std::string& packageDataFileName) noexcept
{
auto pos = packageDataFileName.find(PACKAGE_FILE_DATA_SUFFIX);
assert(pos != std::string::npos);
return packageDataFileName.substr(0, pos) + PACKAGE_FILE_META_SUFFIX;
}
Storage* LogicalFileSystem::GetStorage(const EntryMeta& entryMeta, bool isLink) const noexcept
{
if (isLink) {
return _inputStorage.get();
}
if (entryMeta.IsMutable()) {
return _outputStorage.get();
}
// online will use output storage even though flushed. eg: rt segment deletionmap
// see ut OnlinePartitionInteTest/OnlinePartitionInteTestMode.TestRtRemoveRtDoc/0 for more detail
if (!_options->isOffline && _outputStorage->IsExistInCache(entryMeta.GetLogicalPath())) {
return _outputStorage.get();
}
return _inputStorage.get();
}
FSResult<EntryMeta> LogicalFileSystem::GetEntryMeta(const std::string& logicalPath) const noexcept
{
return _entryTable->GetEntryMetaMayLazy(logicalPath);
}
FSResult<std::shared_ptr<FileReader>>
LogicalFileSystem::CreateSliceFileReader(const string& path, const ReaderOption& readerOption) noexcept
{
AUTIL_LOG(DEBUG, "CreateSliceFileReader[%s]", path.c_str());
ScopedLock lock(*_lock);
auto [ec, entryMeta] = _entryTable->GetEntryMeta(path);
if (ec == FSEC_NOENT) {
return {FSEC_OK, std::shared_ptr<FileReader>()}; // for compitable
}
RETURN2_IF_FS_ERROR(ec, std::shared_ptr<FileReader>(), "GetEntryMeta [%s] failed", path.c_str());
auto [ec2, sliceFileReader] =
_outputStorage->CreateSliceFileReader(path, entryMeta.GetFullPhysicalPath(), readerOption);
if (ec2 == FSEC_NOENT) {
return {FSEC_OK, std::shared_ptr<FileReader>()}; // for compitable
}
RETURN2_IF_FS_ERROR(ec2, std::shared_ptr<FileReader>(), "CreateSliceFileReader[%s->%s] failed", path.c_str(),
entryMeta.GetFullPhysicalPath().c_str());
if (sliceFileReader) {
sliceFileReader->InitMetricReporter(GetFileSystemMetricsReporter());
}
return {FSEC_OK, sliceFileReader};
}
FSResult<std::shared_ptr<FileWriter>>
LogicalFileSystem::CreateSliceFileWriter(const string& path, const WriterOption& writerOption) noexcept
{
AUTIL_LOG(DEBUG, "CreateSliceFileWriter[%s], sliceLen[%lu], sliceNum[%d]", path.c_str(), writerOption.sliceLen,
writerOption.sliceNum);
ScopedLock lock(*_lock);
auto [ec, entryMeta] = CreateEntryMeta(path);
RETURN2_IF_FS_ERROR(ec, std::shared_ptr<FileWriter>(), "CreateEntryMeta failed, path[%s]", path.c_str());
// @qingran all not flushed on disk files include slice file should not display in entry_table.X
// slice file does not need fs patch. it is a pure memory file without sync to disk
entryMeta.SetIsMemFile(true);
entryMeta.SetPhysicalRoot(_entryTable->GetPhysicalRootPointer(_entryTable->GetOutputRoot()));
auto ret = _outputStorage->CreateSliceFileWriter(path, entryMeta.GetFullPhysicalPath(), writerOption);
RETURN2_IF_FS_ERROR(ret.Code(), std::shared_ptr<FileWriter>(), "CreateSliceFileWriter[%s] => [%s] failed",
path.c_str(), entryMeta.GetFullPhysicalPath().c_str());
ec = _entryTable->AddEntryMeta(entryMeta).ec;
RETURN2_IF_FS_ERROR(ec, std::shared_ptr<FileWriter>(), "Entry table add SliceFile failed, [%s]",
entryMeta.DebugString().c_str());
return ret;
}
FSResult<std::shared_ptr<FileWriter>> LogicalFileSystem::CreateMemFileWriter(const string& path,
const WriterOption& writerOption) noexcept
{
ScopedLock lock(*_lock);
if (IsExistInCache(path)) {
AUTIL_LOG(INFO, "file [%s] already exist, remove it before CreateFileWriter!", path.c_str());
RETURN2_IF_FS_ERROR(RemoveFile(path, RemoveOption()), std::shared_ptr<FileWriter>(), "remove file [%s] failed",
path.c_str());
}
auto [ec, entryMeta] = CreateEntryMeta(path);
RETURN2_IF_FS_ERROR(ec, std::shared_ptr<FileWriter>(), "CreateEntryMeta failed, path[%s]", path.c_str());
auto ret = _outputStorage->CreateMemFileWriter(path, entryMeta.GetFullPhysicalPath(), writerOption);
RETURN2_IF_FS_ERROR(ret.ec, std::shared_ptr<FileWriter>(), "Create file [%s] => [%s] writer failed", path.c_str(),
entryMeta.GetFullPhysicalPath().c_str());
entryMeta.SetLength(writerOption.fileLength);
entryMeta.SetIsMemFile(true);
ec = _entryTable->AddEntryMeta(entryMeta).ec;
RETURN2_IF_FS_ERROR(ec, std::shared_ptr<FileWriter>(), "Entry table add InMemoryFile failed, [%s]",
entryMeta.DebugString().c_str());
return ret;
}
FSResult<std::unique_ptr<IFileSystem>>
LogicalFileSystem::CreateThreadOwnFileSystem(const std::string& name) const noexcept
{
std::unique_ptr<LogicalFileSystem> threadFileSystem =
std::make_unique<LogicalFileSystem>(name, _outputRoot, nullptr);
threadFileSystem->_lock = _lock;
threadFileSystem->_entryTableBuilder = _entryTableBuilder;
threadFileSystem->_entryTable = _entryTable;
threadFileSystem->_metricsReporter = _metricsReporter;
auto ec = threadFileSystem->Init(*_options);
if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "init threadFileSystem failed");
return {ec, nullptr};
}
return {FSEC_OK, std::move(threadFileSystem)};
}
void LogicalFileSystem::TEST_EnableSupportMmap(bool isEnable)
{
if (_inputStorage != nullptr) {
_inputStorage->TEST_EnableSupportMmap(isEnable);
}
if (_outputStorage != nullptr) {
_outputStorage->TEST_EnableSupportMmap(isEnable);
}
}
FSResult<void> LogicalFileSystem::CalculateVersionFileSize(const std::string& rawPhysicalRoot,
const std::string& rawLogicalPath,
versionid_t versionId) noexcept
{
if (versionId == INVALID_VERSIONID) {
return FSEC_OK;
}
{
ScopedLock lock(*_lock);
if (_versionFileSizes.find(versionId) != _versionFileSizes.end()) {
return FSEC_OK;
}
}
const std::string logicalPath = NormalizeLogicalPath(rawLogicalPath);
const std::string physicalRoot = PathUtil::NormalizePath(rawPhysicalRoot);
auto entryTableBuilder = std::make_unique<EntryTableBuilder>();
auto entryTable = entryTableBuilder->CreateEntryTable(_name, _outputRoot, _options);
assert(_outputRoot == entryTable->GetOutputRoot());
auto ec =
entryTableBuilder->MountVersion(physicalRoot, versionId, logicalPath, indexlib::file_system::FSMT_READ_ONLY);
if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "mount version failed, physicalRoot [%s], vesion [%d], logicalPath [%s], ec[%d]",
physicalRoot.c_str(), versionId, logicalPath.c_str(), ec);
return ec;
}
const size_t totalFileSize = entryTable->CalculateFileSize();
ScopedLock lock(*_lock);
_versionFileSizes[versionId] = totalFileSize;
return FSEC_OK;
}
size_t LogicalFileSystem::GetVersionFileSize(versionid_t versionId) const noexcept
{
size_t fileSize = 0;
ScopedLock lock(*_lock);
auto it = _versionFileSizes.find(versionId);
if (it != _versionFileSizes.end()) {
fileSize = it->second;
}
return fileSize;
}
}} // namespace indexlib::file_system