aios/storage/indexlib/file_system/fslib/FslibWrapper.cpp (1,028 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "indexlib/file_system/fslib/FslibWrapper.h"
#include <assert.h>
#include <errno.h>
#include <iosfwd>
#include <list>
#include <memory>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <vector>
#include "alog/Logger.h"
#include "autil/CommonMacros.h"
#include "autil/EnvUtil.h"
#include "autil/Lock.h"
#include "autil/ThreadPool.h"
#include "autil/TimeUtility.h"
#include "fslib/fs/File.h"
#include "fslib/fs/FileSystem.h"
#include "fslib/fs/MMapFile.h"
#include "indexlib/file_system/FileSystemDefine.h"
#include "indexlib/file_system/fslib/FslibCommonFileWrapper.h"
#include "indexlib/file_system/fslib/FslibFileWrapper.h"
#include "indexlib/util/EpochIdUtil.h"
#include "indexlib/util/Exception.h"
#include "indexlib/util/PathUtil.h"
using namespace std;
using namespace autil;
using namespace indexlib::util;
namespace indexlib { namespace file_system {
AUTIL_LOG_SETUP(indexlib.file_system, FslibWrapper);
IOConfig FslibWrapper::_mergeIOConfig = IOConfig();
ThreadPoolPtr FslibWrapper::_writePool;
ThreadPoolPtr FslibWrapper::_readPool;
autil::ThreadMutex FslibWrapper::_writePoolLock;
autil::ThreadMutex FslibWrapper::_readPoolLock;
const string FslibWrapper::EPOCHID = "epochId";
const string FslibWrapper::FENCE_INLINE_FILE = string("fence.inline") + TEMP_FILE_SUFFIX;
const char FslibWrapper::FENCE_ARGS_SEP = '\0';
static autil::ThreadMutex TEST_panguLock;
void FslibWrapper::Reset() noexcept
{
FslibWrapper::_mergeIOConfig = IOConfig();
FslibWrapper::_writePool.reset();
FslibWrapper::_readPool.reset();
}
FSResult<void> FslibWrapper::DeleteFile(const string& path, const DeleteOption& deleteOption) noexcept
{
return Delete(path, deleteOption);
}
FSResult<void> FslibWrapper::DeleteDir(const string& path, const DeleteOption& deleteOption) noexcept
{
return Delete(path, deleteOption);
}
FSResult<void> FslibWrapper::Delete(const string& path, const DeleteOption& deleteOption) noexcept
{
auto ec = Delete(path, deleteOption.fenceContext).Code();
if (deleteOption.mayNonExist) {
if (ec == FSEC_NOENT) {
AUTIL_LOG(INFO, "target remove path [%s] not exist!", path.c_str());
return FSEC_OK;
} else if (ec != FSEC_OK) {
return FSEC_ERROR;
}
AUTIL_LOG(INFO, "delete exist path: %s", path.c_str());
}
return ec;
}
FSResult<void> FslibWrapper::Delete(const string& path, FenceContext* fenceContext) noexcept
{
if (fenceContext) {
return DeleteFencing(path, fenceContext);
}
fslib::ErrorCode ec = fslib::fs::FileSystem::remove(path);
if (ec == fslib::EC_OK) {
return FSEC_OK;
}
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(DEBUG, "delete [%s] failed, no entry", path.c_str());
return FSEC_NOENT;
}
AUTIL_LOG(ERROR, "delete [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_ERROR;
}
FSResult<void> FslibWrapper::AtomicStore(const string& filePath, const char* data, size_t len, bool removeIfExist,
FenceContext* fenceContext) noexcept
{
// @uniqFlag is to make tmp file unique for each worker
string uniqFlag = fenceContext ? ("e" + fenceContext->epochId)
: ("t" + std::to_string(autil::TimeUtility::currentTimeInMicroSeconds()));
const string tempFilePath = filePath + "." + uniqFlag + TEMP_FILE_SUFFIX;
if (removeIfExist) {
bool isExist = false;
auto ret = IsExist(filePath, isExist);
if (ret != FSEC_OK) {
AUTIL_LOG(WARN, "isExist failed of file[%s]", filePath.c_str());
return ret;
}
if (isExist) {
ret = Delete(filePath, fenceContext).Code();
if (ret != FSEC_OK) {
AUTIL_LOG(WARN, "delete file failed when atomic store, file[%s]", filePath.c_str());
return ret;
}
}
ret = IsExist(tempFilePath, isExist);
if (ret != FSEC_OK) {
AUTIL_LOG(WARN, "isExist failed of file[%s]", tempFilePath.c_str());
return ret;
}
if (isExist) {
ret = Delete(tempFilePath, FenceContext::NoFence()).Code();
if (ret != FSEC_OK) {
AUTIL_LOG(WARN, "delete file failed when atomic store, file[%s]", tempFilePath.c_str());
return ret;
}
}
}
auto ret = Store(tempFilePath, data, len);
if (ret != FSEC_OK) {
AUTIL_LOG(ERROR, "store file failed, file[%s]", tempFilePath.c_str());
return ret;
}
auto ec = RenameWithFenceContext(tempFilePath, filePath, fenceContext).Code();
if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "rename file: [%s] to file: [%s] failed, error: %d", tempFilePath.c_str(), filePath.c_str(),
ec);
return ec;
}
return FSEC_OK;
}
bool FslibWrapper::UpdateFenceInlineFile(FenceContext* fenceContext) noexcept
{
const string& epochId = fenceContext->epochId;
string inlineFilePath = JoinPath(fenceContext->fenceHintPath, FENCE_INLINE_FILE);
string currentEpochId;
auto ec = StatPanguInlineFile(inlineFilePath, currentEpochId).Code();
if (ec == FSEC_NOENT) {
bool fenceHintPathExist = false;
auto ec1 = IsExist(fenceContext->fenceHintPath, fenceHintPathExist).Code();
if (ec1 != FSEC_OK) {
AUTIL_LOG(ERROR, "fence hint path [%s] is exsited failed, ec [%d]", fenceContext->fenceHintPath.c_str(),
ec1);
return false;
}
if (!fenceHintPathExist) {
ec1 = MkDirIfNotExist(fenceContext->fenceHintPath);
if (ec1 != FSEC_OK) {
AUTIL_LOG(ERROR, "fence hint path [%s] create failed, ec [%d]", fenceContext->fenceHintPath.c_str(),
ec1);
return false;
}
}
ec1 = CreatePanguInlineFile(inlineFilePath);
if (ec1 != FSEC_OK && ec1 != FSEC_EXIST) {
AUTIL_LOG(ERROR, " failed as file [%s] not exist, and create it failed, ec is [%d]", inlineFilePath.c_str(),
ec1);
return false;
}
ec1 = UpdatePanguInlineFileCAS(inlineFilePath, "", epochId);
if (ec1 != FSEC_OK) {
AUTIL_LOG(ERROR, " failed as update file [%s] to epochId [%s], failed, ec is [%d]", inlineFilePath.c_str(),
epochId.c_str(), ec1);
return false;
}
currentEpochId = epochId;
} else if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, "failed as stat file [%s] failed, ec is [%d]", inlineFilePath.c_str(), ec);
return false;
}
if (!EpochIdUtil::CompareGE(epochId, currentEpochId)) {
AUTIL_LOG(ERROR, "epochId [%s] is smaller than current epochId [%s], cannot operate file", epochId.c_str(),
currentEpochId.c_str());
return false;
}
if (epochId != currentEpochId) {
auto ec = UpdatePanguInlineFileCAS(inlineFilePath, currentEpochId, epochId).Code();
if (ec != FSEC_OK) {
AUTIL_LOG(ERROR, " failed as update file [%s] to epochId [%s], failed, ec is [%d]", inlineFilePath.c_str(),
epochId.c_str(), ec);
return false;
}
AUTIL_LOG(INFO, "update fence inline file [%s] content from [%s] to [%s]", inlineFilePath.c_str(),
currentEpochId.c_str(), epochId.c_str());
}
return true;
}
FSResult<void> FslibWrapper::DeleteFencing(const std::string& path, FenceContext* fenceContext) noexcept
{
assert(fenceContext);
if (fenceContext->usePangu == true) {
// args is @inlineFilePath+'\0'+@epochId
if (fenceContext->hasPrepareHintFile == false) {
if (!UpdateFenceInlineFile(fenceContext)) {
AUTIL_LOG(ERROR, "fencing delete path [%s] failed, can't operate", path.c_str());
return FSEC_ERROR;
}
fenceContext->hasPrepareHintFile = true;
}
stringstream args;
args << JoinPath(fenceContext->fenceHintPath, FENCE_INLINE_FILE) << FENCE_ARGS_SEP << fenceContext->epochId;
return DeletePanguPathCAS(path, args.str());
} else {
assert(false);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::Store(const string& filePath, const char* data, size_t size) noexcept
{
fslib::fs::FilePtr file(fslib::fs::FileSystem::openFile(filePath, fslib::WRITE, false, size));
if (unlikely(!file)) {
AUTIL_LOG(ERROR, "open file failed, file[%s]", filePath.c_str());
return FSEC_ERROR;
}
if (!file->isOpened()) {
fslib::ErrorCode ec = file->getLastError();
if (ec == fslib::EC_EXIST) {
AUTIL_LOG(DEBUG, "file already exist, file[%s]", filePath.c_str());
return FSEC_EXIST;
}
AUTIL_LOG(ERROR, "open file failed, file[%s], ec[%d]", filePath.c_str(), ec);
return FSEC_ERROR;
}
if (unlikely(fslib::fs::FileSystem::GENERATE_ERROR("write", file->getFileName()) != fslib::EC_OK)) {
return FSEC_ERROR;
}
ssize_t writeBytes = file->write(data, size);
if (writeBytes != static_cast<ssize_t>(size)) {
AUTIL_LOG(ERROR, "write file: [%s] failed, error: %s", filePath.c_str(),
GetErrorString(file->getLastError()).c_str());
return FSEC_ERROR;
}
fslib::ErrorCode ec = file->close();
if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "close file: [%s] failed, error: %s", filePath.c_str(), GetErrorString(ec).c_str());
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::AtomicLoad(const string& filePath, string& fileContent) noexcept
{
return Load(filePath, fileContent);
}
FSResult<void> FslibWrapper::Load(const string& filePath, string& fileContent) noexcept
{
fslib::fs::FilePtr file(fslib::fs::FileSystem::openFile(filePath, fslib::READ));
if (unlikely(!file)) {
AUTIL_LOG(ERROR, "open file failed, file[%s]", filePath.c_str());
return FSEC_ERROR;
}
if (!file->isOpened()) {
fslib::ErrorCode ec = file->getLastError();
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(DEBUG, "file not exist [%s].", filePath.c_str());
return FSEC_NOENT;
} else {
AUTIL_LOG(ERROR, "file not exist [%s]. error: %s", filePath.c_str(),
GetErrorString(file->getLastError()).c_str());
return FSEC_ERROR;
}
}
fileContent.clear();
constexpr ssize_t bufSize = 128 * 1024;
char buffer[bufSize];
ssize_t readBytes = 0;
do {
readBytes = file->read(buffer, bufSize);
if (readBytes > 0) {
if (fileContent.empty()) {
fileContent = string(buffer, readBytes);
} else {
fileContent.append(buffer, readBytes);
}
}
} while (readBytes == bufSize);
if (!file->isEof()) {
AUTIL_LOG(ERROR, "Read file[%s] FAILED, error: [%s]", filePath.c_str(),
GetErrorString(file->getLastError()).c_str());
return FSEC_ERROR;
}
fslib::ErrorCode ret = file->close();
if (ret != fslib::EC_OK) {
AUTIL_LOG(ERROR, "Close file[%s] FAILED, error: [%s]", filePath.c_str(), GetErrorString(ret).c_str());
return ParseFromFslibEC(ret);
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::AtomicLoad(FslibFileWrapper* file, string& fileContent) noexcept
{
fileContent.clear();
if (!file) {
AUTIL_LOG(ERROR, "empty file point");
return FSEC_ERROR;
}
size_t readLength = 0;
int64_t offset = 0;
constexpr ssize_t bufSize = 128 * 1024;
char buffer[bufSize];
while (true) {
auto ret = file->PRead(buffer, bufSize, offset, readLength);
if (ret != FSEC_OK) {
AUTIL_LOG(ERROR, "pread failed, ret[%d]", static_cast<int>(ret));
return FSEC_ERROR;
}
if (readLength > 0) {
if (fileContent.empty()) {
fileContent = string(buffer, readLength);
} else {
fileContent.append(buffer, readLength);
}
offset += readLength;
} else {
break;
}
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::OpenFile(const string& fileName, fslib::Flag mode, bool useDirectIO, ssize_t fileLength,
FslibFileWrapper*& fileWrapper) noexcept
{
AUTIL_LOG(DEBUG, "OpenFile [%s], mode[%d]", fileName.c_str(), mode);
fslib::fs::File* file = fslib::fs::FileSystem::openFile(fileName, mode, useDirectIO, fileLength);
if (unlikely(!file)) {
AUTIL_LOG(ERROR, "Open file: [%s] FAILED", fileName.c_str());
return FSEC_ERROR;
}
if (!file->isOpened()) {
fslib::ErrorCode ec = file->getLastError();
delete file;
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(ERROR, "File [%s] not exist.", fileName.c_str());
return FSEC_NOENT;
}
AUTIL_LOG(ERROR, "Open file: [%s] FAILED, ec[%d]", fileName.c_str(), ec);
return ParseFromFslibEC(ec);
}
fileWrapper = new FslibCommonFileWrapper(file, useDirectIO);
return FSEC_OK;
}
FSResult<void> FslibWrapper::MmapFile(const string& fileName, fslib::Flag openMode, char* start, int64_t length,
int prot, int mapFlag, off_t offset, ssize_t fileLength,
fslib::fs::MMapFile*& mmapFile) noexcept
{
AUTIL_LOG(DEBUG, "MmapFile [%s], mode[%d]", fileName.c_str(), openMode);
if (openMode == fslib::READ) {
if (length == -1) {
auto ret = GetFileLength(fileName, length);
if (!ret.OK()) {
return ret;
}
length = fileLength > 0 ? fileLength : length;
}
if (length == 0) {
mmapFile = new fslib::fs::MMapFile(fileName, -1, NULL, 0, 0, fslib::EC_OK);
return FSEC_OK;
}
}
assert(length != (size_t)-1 && length != 0);
fslib::fs::MMapFile* file =
fslib::fs::FileSystem::mmapFile(fileName, openMode, start, length, prot, mapFlag, offset, fileLength);
if (!file) {
AUTIL_LOG(ERROR, "Mmap file: [%s] FAILED", fileName.c_str());
return FSEC_ERROR;
}
if (!file->isOpened()) {
fslib::ErrorCode ec = file->getLastError();
assert(ec != fslib::EC_OK);
delete file;
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(ERROR, "Mmap File: [%s] not exist.", fileName.c_str());
return FSEC_NOENT;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "Mmap file: [%s] FAILED, ec[%d]", fileName.c_str(), ec);
return FSEC_ERROR;
}
}
mmapFile = file;
return FSEC_OK;
}
FSResult<void> FslibWrapper::IsExist(const string& filePath, bool& ret) noexcept
{
AUTIL_LOG(TRACE1, "IsExist [%s]", filePath.c_str());
fslib::ErrorCode ec = fslib::fs::FileSystem::isExist(filePath);
if (ec == fslib::EC_TRUE) {
ret = true;
return FSEC_OK;
} else if (ec == fslib::EC_FALSE) {
ret = false;
return FSEC_OK;
}
AUTIL_LOG(WARN, "Determine file existence failed, file [%s], error: %s", filePath.c_str(),
GetErrorString(ec).c_str());
return FSEC_ERROR;
}
FSResult<void> FslibWrapper::IsDir(const string& path, bool& ret) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::isDirectory(path);
if (ec == fslib::EC_TRUE) {
ret = true;
return FSEC_OK;
} else if (ec == fslib::EC_FALSE || ec == fslib::EC_NOENT) {
ret = false;
return FSEC_OK;
}
AUTIL_LOG(WARN, "Determine is directory of path [%s] FAILED: [%s]", path.c_str(), GetErrorString(ec).c_str());
ret = false;
return FSEC_ERROR;
}
FSResult<void> FslibWrapper::IsFile(const string& path, bool& ret) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::isFile(path);
if (ec == fslib::EC_TRUE) {
ret = true;
return FSEC_OK;
} else if (ec == fslib::EC_FALSE || ec == fslib::EC_NOENT) {
ret = false;
return FSEC_OK;
}
AUTIL_LOG(WARN, "Determine is file of path [%s] FAILED: [%s]", path.c_str(), GetErrorString(ec).c_str());
ret = false;
return FSEC_ERROR;
}
bool FslibWrapper::IsTempFile(const string& filePath) noexcept
{
size_t pos = filePath.rfind('.');
return (pos != string::npos && filePath.substr(pos) == string(TEMP_FILE_SUFFIX));
}
FSResult<void> FslibWrapper::ListDirRecursive(const string& path, FileList& fileList) noexcept
{
list<string> pathList;
string relativeDir;
do {
fslib::EntryList tmpList;
string absoluteDir = PathUtil::JoinPath(path, relativeDir);
fslib::ErrorCode ec = fslib::fs::FileSystem::listDir(absoluteDir, tmpList);
if (ec != fslib::EC_OK) {
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "path not exist, path[%s]", absoluteDir.c_str());
return FSEC_NOENT;
}
if (ec == fslib::EC_NOTDIR) {
AUTIL_LOG(ERROR, "path not dir, path[%s]", absoluteDir.c_str());
return FSEC_NOTDIR;
}
AUTIL_LOG(ERROR, "list dir failed, path[%s], ec[%d]", absoluteDir.c_str(), ec);
return FSEC_ERROR;
}
fslib::EntryList::iterator it = tmpList.begin();
for (; it != tmpList.end(); it++) {
if (IsTempFile(it->entryName)) {
continue;
}
string tmpName = PathUtil::JoinPath(relativeDir, it->entryName);
if (it->isDir) {
pathList.push_back(tmpName);
fileList.push_back(PathUtil::NormalizeDir(tmpName));
} else {
fileList.push_back(tmpName);
}
}
if (pathList.empty()) {
break;
}
relativeDir = pathList.front();
pathList.pop_front();
} while (true);
return FSEC_OK;
}
FSResult<void> FslibWrapper::ListDir(const string& dirPath, FileList& fileList) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::listDir(dirPath, fileList);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "dir not exist, path[%s]", dirPath.c_str());
return FSEC_NOENT;
} else if (ec == fslib::EC_NOTDIR) {
AUTIL_LOG(ERROR, "path is not a dir, path[%s]", dirPath.c_str());
return FSEC_NOTDIR;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "list dir failed, path[%s], ec[%d]", dirPath.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::ListDir(const string& dirPath, fslib::EntryList& entryList) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::listDir(dirPath, entryList);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "dir not exist, path[%s]", dirPath.c_str());
return FSEC_NOENT;
} else if (ec == fslib::EC_NOTDIR) {
AUTIL_LOG(ERROR, "path is not a dir, path[%s]", dirPath.c_str());
return FSEC_NOTDIR;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "list dir failed, path[%s], ec[%d]", dirPath.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::MkDir(const string& dirPath, bool recursive, bool mayExist) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::mkDir(dirPath, recursive);
if (ec == fslib::EC_EXIST) {
if (mayExist) {
return FSEC_OK;
}
AUTIL_LOG(ERROR, "dir already exist, path[%s]", dirPath.c_str());
return FSEC_EXIST;
} else if (ec == fslib::EC_NOENT) {
AUTIL_LOG(ERROR, "parrent dir not exist, path[%s]", dirPath.c_str());
return FSEC_NOTDIR;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "make dir failed, path[%s], ec[%d]", dirPath.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::MkDirIfNotExist(const string& dirPath) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::mkDir(dirPath, true);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(ERROR, "make dir failed, path[%s], ec[%d]", dirPath.c_str(), ec);
return FSEC_NOENT;
}
if (ec != fslib::EC_OK && ec != fslib::EC_EXIST) {
AUTIL_LOG(ERROR, "make dir failed, path[%s], ec[%d]", dirPath.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::Rename(const string& srcName, const string& dstName) noexcept
{
return RenameWithFenceContext(srcName, dstName, FenceContext::NoFence());
}
FSResult<void> FslibWrapper::RenameWithFenceContext(const string& srcName, const string& dstName,
FenceContext* fenceContext) noexcept
{
if (fenceContext) {
return RenameFencing(srcName, dstName, fenceContext);
}
fslib::ErrorCode ec = fslib::fs::FileSystem::rename(srcName, dstName);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "rename [%s] to [%s] failed, src not exist", srcName.c_str(), dstName.c_str());
return FSEC_NOENT;
} else if (ec == fslib::EC_EXIST) {
AUTIL_LOG(WARN, "rename [%s] to [%s] failed, dst exist", srcName.c_str(), dstName.c_str());
return FSEC_EXIST;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "rename [%s] to [%s] failed, ec[%d]", srcName.c_str(), dstName.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::RenameFencing(const string& srcName, const string& dstName,
FenceContext* fenceContext) noexcept
{
if (srcName.find(TEMP_FILE_SUFFIX) == string::npos) {
AUTIL_LOG(ERROR, "rename fenceing only support rename temp src");
return FSEC_ERROR;
}
if (fenceContext->usePangu == true) {
// args is @dstName + '\0' + @inlineFilePath+'\0'+@epochId
if (fenceContext->hasPrepareHintFile == false) {
if (!UpdateFenceInlineFile(fenceContext)) {
AUTIL_LOG(ERROR, "fencing rename srcName [%s] to dest [%s] failed, can't operate", srcName.c_str(),
dstName.c_str());
return FSEC_ERROR;
}
fenceContext->hasPrepareHintFile = true;
}
stringstream args;
args << dstName << FENCE_ARGS_SEP << JoinPath(fenceContext->fenceHintPath, FENCE_INLINE_FILE) << FENCE_ARGS_SEP
<< fenceContext->epochId;
return RenamePanguPathCAS(srcName, args.str());
} else {
assert(false);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::Copy(const string& srcName, const string& dstName) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::copy(srcName, dstName);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(ERROR, "source file not existe, file[%s]", srcName.c_str());
return FSEC_NOENT;
} else if (ec == fslib::EC_EXIST) {
AUTIL_LOG(INFO, "dest file already exist, file[%s]", dstName.c_str());
return FSEC_EXIST;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "copy [%s] to [%s] failed, ec[%d]", srcName.c_str(), dstName.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::GetFileMeta(const string& fileName, fslib::FileMeta& fileMeta) noexcept
{
AUTIL_LOG(TRACE1, "GetFileMeta [%s]", fileName.c_str());
fslib::ErrorCode ec = fslib::fs::FileSystem::getFileMeta(fileName, fileMeta);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(INFO, "file not exist, file[%s]", fileName.c_str());
return FSEC_NOENT;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "get file meta failed, file[%s], ec[%d]", fileName.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::GetPathMeta(const string& path, fslib::PathMeta& pathMeta) noexcept
{
AUTIL_LOG(TRACE1, "GetPathMeta [%s]", path.c_str());
fslib::ErrorCode ec = fslib::fs::FileSystem::getPathMeta(path, pathMeta);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(INFO, "path not exist, path[%s]", path.c_str());
return FSEC_NOENT;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "get path meta failed, path[%s], ec[%d]", path.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::GetFileLength(const string& fileName, int64_t& fileLength) noexcept
{
fslib::FileMeta fileMeta;
fileMeta.fileLength = 0;
AUTIL_LOG(TRACE1, "GetFileMeta [%s]", fileName.c_str());
fslib::ErrorCode ec = fslib::fs::FileSystem::getFileMeta(fileName, fileMeta);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "get file len failed, file not exist, file[%s]", fileName.c_str());
return FSEC_NOENT;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "get file meta failed, file[%s], ec[%d]", fileName.c_str(), ec);
return FSEC_ERROR;
}
fileLength = fileMeta.fileLength;
return FSEC_OK;
}
FSResult<void> FslibWrapper::GetDirSize(const string& path, size_t& dirSize) noexcept
{
fslib::FileList fileList;
auto ret = ListDirRecursive(path, fileList);
if (!ret.OK()) {
AUTIL_LOG(WARN, "list path[%s] failed", path.c_str());
return FSEC_ERROR;
}
dirSize = 0;
for (const auto& fileName : fileList) {
bool isDir = *fileName.rbegin() == '/';
string filePath = PathUtil::JoinPath(path, fileName);
if (!isDir) {
auto [ec, fileLength] = GetFileLength(filePath);
if (ec != FSEC_OK) {
return FSEC_ERROR;
}
dirSize += fileLength;
}
}
return FSEC_OK;
}
bool FslibWrapper::NeedMkParentDirBeforeOpen(const string& path) noexcept
{
return fslib::fs::FileSystem::getFsType(path) == FSLIB_FS_LOCAL_FILESYSTEM_NAME;
}
FSResult<void> FslibWrapper::Link(const string& srcPath, const string& dstPath) noexcept
{
fslib::ErrorCode ec = fslib::fs::FileSystem::link(srcPath, dstPath);
if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "link [%s] to [%s] failed, src not exist", srcPath.c_str(), dstPath.c_str());
return FSEC_NOENT;
} else if (ec == fslib::EC_EXIST) {
AUTIL_LOG(WARN, "link [%s] to [%s] failed, dst exist", srcPath.c_str(), dstPath.c_str());
return FSEC_EXIST;
} else if (ec == fslib::EC_NOTSUP) {
AUTIL_LOG(ERROR, "not support link for src path[%s] to dst path[%s]", srcPath.c_str(), dstPath.c_str());
return FSEC_NOTSUP;
} else if (ec != fslib::EC_OK) {
AUTIL_LOG(ERROR, "link [%s] to [%s] failed, ec[%d]", srcPath.c_str(), dstPath.c_str(), ec);
return FSEC_ERROR;
}
return FSEC_OK;
}
FSResult<void> FslibWrapper::SymLink(const string& srcPath, const string& dstPath) noexcept
{
if (fslib::fs::FileSystem::getFsType(srcPath) != FSLIB_FS_LOCAL_FILESYSTEM_NAME) {
AUTIL_LOG(DEBUG, "not support make symlink for dfs path[%s]", srcPath.c_str());
return FSEC_NOTSUP;
}
int ret = symlink(srcPath.c_str(), dstPath.c_str());
if (ret < 0) {
AUTIL_LOG(WARN, "symlink src [%s] to dst [%s] fail, %s.", srcPath.c_str(), dstPath.c_str(), strerror(errno));
return FSEC_ERROR;
}
return FSEC_OK;
}
const autil::ThreadPoolPtr& FslibWrapper::GetReadThreadPool() noexcept
{
ScopedLock lock(_readPoolLock);
if (_readPool) {
return _readPool;
}
InitThreadPool(_readPool, _mergeIOConfig.readThreadNum, _mergeIOConfig.readQueueSize, "indexFSWRead");
return _readPool;
}
const autil::ThreadPoolPtr& FslibWrapper::GetWriteThreadPool() noexcept
{
ScopedLock lock(_writePoolLock);
if (_writePool) {
return _writePool;
}
InitThreadPool(_writePool, _mergeIOConfig.writeThreadNum, _mergeIOConfig.writeQueueSize, "indexFSWWrite");
return _writePool;
}
void FslibWrapper::InitThreadPool(ThreadPoolPtr& threadPool, uint32_t threadNum, uint32_t queueSize,
const string& name) noexcept
{
assert(!threadPool);
threadPool.reset(new ThreadPool(threadNum, queueSize, true));
threadPool->start(name);
}
FSResult<void> FslibWrapper::UpdatePanguInlineFile(const string& path, const string& content) noexcept
{
constexpr const char* UPDATE_INLINE_COMMAND = "pangu_UpdateInlinefile";
static string out;
if (autil::EnvUtil::hasEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE")) {
ScopedLock lock(TEST_panguLock);
auto ec = DeleteFile(path, DeleteOption::NoFence(true)).Code();
if (ec != FSEC_OK) {
return ec;
}
return Store(path, content);
}
fslib::ErrorCode ec = fslib::fs::FileSystem::forward(UPDATE_INLINE_COMMAND, path, content, out);
if (ec == fslib::EC_OK) {
return FSEC_OK;
} else if (ec == fslib::EC_NOTSUP) {
AUTIL_LOG(WARN, "update pangu inline file [%s] with content [%s] failed, ec[%d]", path.c_str(), content.c_str(),
ec);
return FSEC_NOTSUP;
} else {
AUTIL_LOG(WARN, "update pangu inline file [%s] with content [%s] failed, ec[%d]", path.c_str(), content.c_str(),
ec);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::UpdatePanguInlineFileCAS(const std::string& path, const std::string& oldContent,
const std::string& newContent) noexcept
{
constexpr const char* UPDATE_INLINE_COMMAND = "pangu_UpdateInlinefileCAS";
if (autil::EnvUtil::hasEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE")) {
ScopedLock lock(TEST_panguLock);
auto ec = DeleteFile(path, DeleteOption::NoFence(true)).Code();
if (ec != FSEC_OK) {
return ec;
}
return Store(path, newContent);
}
static string out;
stringstream args;
args << oldContent << FENCE_ARGS_SEP << newContent;
fslib::ErrorCode ec = fslib::fs::FileSystem::forward(UPDATE_INLINE_COMMAND, path, args.str(), out);
if (ec == fslib::EC_OK) {
return FSEC_OK;
} else if (ec == fslib::EC_NOTSUP) {
AUTIL_LOG(WARN, "update pangu inline file [%s] with new content [%s] old content [%s] failed, ec[%d]",
path.c_str(), newContent.c_str(), oldContent.c_str(), ec);
return FSEC_NOTSUP;
} else if (ec == fslib::EC_PERMISSION) {
AUTIL_LOG(WARN, "update pangu inline file [%s] with new content [%s] old content [%s] failed, ec[%d]",
path.c_str(), newContent.c_str(), oldContent.c_str(), ec);
return FSEC_ERROR;
} else {
AUTIL_LOG(WARN, "update pangu inline file [%s] with new content [%s] old content [%s] failed, ec[%d]",
path.c_str(), newContent.c_str(), oldContent.c_str(), ec);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::CreatePanguInlineFile(const std::string& path) noexcept
{
if (autil::EnvUtil::hasEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE")) {
ScopedLock lock(TEST_panguLock);
auto ec = Store(path, "").Code();
return ec;
}
constexpr const char* CREATE_INLINE_COMMAND = "pangu_CreateInlinefile";
static string content, out;
fslib::ErrorCode ec = fslib::fs::FileSystem::forward(CREATE_INLINE_COMMAND, path, content, out);
if (ec == fslib::EC_OK) {
return FSEC_OK;
} else if (ec == fslib::EC_NOTSUP) {
AUTIL_LOG(WARN, "create pangu inline file [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_NOTSUP;
} else if (ec == fslib::EC_EXIST) {
AUTIL_LOG(WARN, "create pangu inline file [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_EXIST;
} else {
AUTIL_LOG(WARN, "create pangu inline file [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::StatPanguInlineFile(const std::string& path, string& out) noexcept
{
if (autil::EnvUtil::hasEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE")) {
ScopedLock lock(TEST_panguLock);
return Load(path, out);
}
constexpr const char* STAT_INLINE_COMMAND = "pangu_StatInlinefile";
static string content;
fslib::ErrorCode ec = fslib::fs::FileSystem::forward(STAT_INLINE_COMMAND, path, content, out);
if (ec == fslib::EC_OK) {
return FSEC_OK;
} else if (ec == fslib::EC_NOTSUP) {
AUTIL_LOG(INFO, "stat pangu inline file [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_NOTSUP;
} else if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "stat pangu inline file [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_NOENT;
} else {
AUTIL_LOG(WARN, "stat pangu inline file [%s] failed, ec[%d]", path.c_str(), ec);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::DeletePanguPathCAS(const std::string& path, const std::string& args) noexcept
{
if (autil::EnvUtil::hasEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE")) {
ScopedLock lock(TEST_panguLock);
return Delete(path, FenceContext::NoFence());
}
constexpr const char* DELETE_WITH_INLINE_FILE = "pangu_DeleteWithInlinefile";
static string output;
auto fsEc = fslib::fs::FileSystem::forward(DELETE_WITH_INLINE_FILE, path, args, output);
if (fsEc == fslib::EC_OK) {
return FSEC_OK;
} else if (fsEc == fslib::EC_NOTSUP) {
assert(false);
return FSEC_NOTSUP;
} else if (fsEc == fslib::EC_PERMISSION) {
AUTIL_LOG(WARN, "delete file cas [%s] failed, ec[%d]", path.c_str(), fslib::EC_PERMISSION);
return FSEC_ERROR;
} else if (fsEc == fslib::EC_NOENT) {
AUTIL_LOG(DEBUG, "delete [%s] failed, no entry", path.c_str());
return FSEC_NOENT;
} else {
AUTIL_LOG(WARN, "delete pangu file [%s] failed, ec[%d], pangu ec [%d]", path.c_str(), FSEC_ERROR, fsEc);
return FSEC_ERROR;
}
}
FSResult<void> FslibWrapper::RenamePanguPathCAS(const std::string& srcName, const std::string& args) noexcept
{
if (autil::EnvUtil::hasEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE")) {
ScopedLock lock(TEST_panguLock);
vector<string> argVec;
StringUtil::split(argVec, args, FENCE_ARGS_SEP, false);
assert(argVec.size() == 3);
string& destPath = argVec[0];
return Rename(srcName, destPath);
}
constexpr const char* RENAME_WITH_INLINE_FILE = "pangu_RenameWithInlinefile";
static string output;
auto ec = fslib::fs::FileSystem::forward(RENAME_WITH_INLINE_FILE, srcName, args, output);
vector<string> argVec;
StringUtil::split(argVec, args, FENCE_ARGS_SEP, false);
assert(argVec.size() == 3);
const string& destName = argVec[0];
if (ec == fslib::EC_OK) {
return FSEC_OK;
} else if (ec == fslib::EC_NOTSUP) {
assert(false);
return FSEC_NOTSUP;
} else if (ec == fslib::EC_PERMISSION) {
AUTIL_LOG(WARN, "src [%s] rename to dest [%s] failed, cas failed, ec[%d]", srcName.c_str(), destName.c_str(),
fslib::EC_PERMISSION);
return FSEC_ERROR;
} else if (ec == fslib::EC_NOENT) {
AUTIL_LOG(WARN, "rename [%s] to [%s] failed, src not exist", srcName.c_str(), destName.c_str());
return FSEC_NOENT;
} else if (ec == fslib::EC_EXIST) {
AUTIL_LOG(WARN, "rename [%s] to [%s] failed, dst exist", srcName.c_str(), destName.c_str());
return FSEC_EXIST;
} else {
AUTIL_LOG(WARN, "src [%s] rename to dest [%s] failed, ec[%d]", srcName.c_str(), destName.c_str(), FSEC_ERROR);
return FSEC_ERROR;
}
}
//////////////////////////////////////////////////////////////////////
FSResult<bool> FslibWrapper::IsExist(const string& filePath) noexcept
{
bool exist = false;
auto ec = IsExist(filePath, exist).Code();
return {ec, exist};
}
FSResult<bool> FslibWrapper::IsDir(const std::string& path) noexcept
{
bool ret = false;
auto ec = IsDir(path, ret).Code();
return {ec, ret};
}
FSResult<bool> FslibWrapper::IsFile(const std::string& path) noexcept
{
bool ret = false;
auto ec = IsFile(path, ret).Code();
return {ec, ret};
}
FSResult<unique_ptr<FslibFileWrapper>> FslibWrapper::OpenFile(const string& fileName, fslib::Flag mode,
bool useDirectIO, ssize_t fileLength) noexcept
{
FslibFileWrapper* fileWrapper = NULL;
auto ec = OpenFile(fileName, mode, useDirectIO, fileLength, fileWrapper).Code();
return {ec, unique_ptr<FslibFileWrapper>(fileWrapper)};
}
FSResult<unique_ptr<fslib::fs::MMapFile>> FslibWrapper::MmapFile(const string& fileName, fslib::Flag openMode,
char* start, int64_t length, int prot, int mapFlag,
off_t offset, ssize_t fileLength) noexcept
{
fslib::fs::MMapFile* mmapFile = NULL;
auto ec = MmapFile(fileName, openMode, start, length, prot, mapFlag, offset, fileLength, mmapFile).Code();
return {ec, unique_ptr<fslib::fs::MMapFile>(mmapFile)};
}
FSResult<size_t> FslibWrapper::GetFileLength(const string& fileName) noexcept
{
int64_t fileLength = 0;
auto ec = GetFileLength(fileName, fileLength).Code();
return {ec, (size_t)fileLength};
}
FSResult<size_t> FslibWrapper::GetDirSize(const std::string& dirPath) noexcept
{
size_t dirSize = 0;
auto ec = GetDirSize(dirPath, dirSize).Code();
return {ec, dirSize};
}
FSResult<fslib::FileMeta> FslibWrapper::GetFileMeta(const std::string& fileName) noexcept
{
fslib::FileMeta fileMeta;
auto ec = GetFileMeta(fileName, fileMeta);
return {ec, fileMeta};
}
FSResult<fslib::PathMeta> FslibWrapper::GetPathMeta(const std::string& path) noexcept
{
fslib::PathMeta pathMeta;
auto ec = GetPathMeta(path, pathMeta).Code();
return {ec, pathMeta};
}
void FslibWrapper::MkDirE(const string& dirPath, bool recursive, bool mayExist) noexcept(false)
{
auto ec = MkDir(dirPath, recursive, mayExist).Code();
if (ec != FSEC_OK) {
INDEXLIB_FATAL_ERROR(FileIO, "make directory [%s] FAILED", dirPath.c_str());
}
}
void FslibWrapper::DeleteFileE(const string& filePath, const DeleteOption& deleteOption) noexcept(false)
{
auto ec = DeleteFile(filePath, deleteOption).Code();
if (ec == FSEC_NOENT) {
INDEXLIB_FATAL_ERROR(NonExist, "delete file FAILED, [%s] not exist.", filePath.c_str());
} else if (unlikely(ec != FSEC_OK)) {
INDEXLIB_FATAL_ERROR(FileIO, "delete file [%s] FAILED", filePath.c_str());
}
}
void FslibWrapper::DeleteDirE(const string& dirPath, const DeleteOption& deleteOption) noexcept(false)
{
auto ec = DeleteDir(dirPath, deleteOption).Code();
if (ec == FSEC_NOENT) {
INDEXLIB_FATAL_ERROR(NonExist, "dir [%s] not exist.", dirPath.c_str());
} else if (unlikely(ec != FSEC_OK)) {
INDEXLIB_FATAL_ERROR(FileIO, "delete dir [%s] FAILED", dirPath.c_str());
}
}
void FslibWrapper::RenameE(const string& srcName, const string& dstName) noexcept(false)
{
auto ec = Rename(srcName, dstName).Code();
if (ec == FSEC_NOENT) {
INDEXLIB_THROW_WARN(NonExist, "source file [%s] not exist.", srcName.c_str());
} else if (ec == FSEC_EXIST) {
INDEXLIB_FATAL_ERROR(Exist, "dest file [%s] already exist.", dstName.c_str());
} else if (ec != FSEC_OK) {
INDEXLIB_FATAL_ERROR(FileIO, "rename [%s] to [%s] FAILED", srcName.c_str(), dstName.c_str());
}
}
void FslibWrapper::AtomicStoreE(const std::string& filePath, const std::string& fileContent) noexcept(false)
{
auto ec = AtomicStore(filePath, fileContent.data(), fileContent.size()).Code();
if (ec == FSEC_EXIST) {
INDEXLIB_FATAL_ERROR(Exist, "file [%s] already exist.", filePath.c_str());
} else if (ec != FSEC_OK) {
INDEXLIB_FATAL_ERROR(FileIO, "store file [%s] FAILED", filePath.c_str());
}
}
void FslibWrapper::AtomicLoadE(const std::string& filePath, std::string& fileContent) noexcept(false)
{
auto ec = AtomicLoad(filePath, fileContent).Code();
if (ec == FSEC_NOENT) {
INDEXLIB_FATAL_ERROR(NonExist, "file [%s] not exist.", filePath.c_str());
} else if (ec != FSEC_OK) {
INDEXLIB_FATAL_ERROR(FileIO, "load file [%s] FAILED", filePath.c_str());
}
}
void FslibWrapper::ListDirE(const string& dirPath, fslib::FileList& fileList) noexcept(false)
{
auto ec = ListDir(dirPath, fileList).Code();
if (ec == FSEC_NOENT) {
INDEXLIB_THROW_WARN(NonExist, "Directory [%s] not exist.", dirPath.c_str());
} else if (ec == FSEC_NOTDIR) {
INDEXLIB_FATAL_ERROR(InconsistentState, "Path [%s] is not a dir.", dirPath.c_str());
} else if (ec != FSEC_OK) {
INDEXLIB_FATAL_ERROR(FileIO, "List directory [%s] FAILED", dirPath.c_str());
}
}
FenceContext* FslibWrapper::CreateFenceContext(const std::string& fenceHintPath, const std::string& epochId) noexcept
{
bool needFence = false;
bool usePangu = false;
if (epochId.empty() || autil::EnvUtil::getEnv("INDEXLIB_FENCE_OPERATE_ROOT", "true") == "false") {
return FenceContext::NoFence();
}
// for test
if (autil::EnvUtil::getEnv("INDEXLIB_TEST_MOCK_PANGU_INLINE_FILE", "false") == "true") {
needFence = true;
usePangu = true;
} else {
string out;
constexpr const char* SUPPORT_FENCE_COMMAND = "pangu_SupportAtomicRename";
auto ec = fslib::fs::FileSystem::forward(SUPPORT_FENCE_COMMAND, fenceHintPath, "", out);
assert(ec == fslib::EC_OK || ec == fslib::EC_NOTSUP);
needFence = (ec == fslib::EC_OK);
usePangu = (out == "pangu");
}
return needFence ? FenceContext::Fence(usePangu, epochId, fenceHintPath) : FenceContext::NoFence();
}
std::string FslibWrapper::JoinPath(const std::string& path, const std::string& name) noexcept
{
return PathUtil::JoinPath(path, name);
}
std::string FslibWrapper::GetErrorString(fslib::ErrorCode ec) noexcept
{
return fslib::fs::FileSystem::getErrorString(ec);
}
}} // namespace indexlib::file_system