core/file_server/polling/PollingDirFile.cpp (635 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "file_server/polling/PollingDirFile.h"
#if defined(__linux__)
#include <fnmatch.h>
#include <sys/file.h>
#elif defined(_MSC_VER)
#include <Shlwapi.h>
#endif
#include <sys/stat.h>
#include "app_config/AppConfig.h"
#include "common/ErrorUtil.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/FileServer.h"
#include "file_server/event/Event.h"
#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
// Control the check frequency to call ClearUnavailableFileAndDir.
DEFINE_FLAG_INT32(check_not_exist_file_dir_round, "clear not exist file dir cache, round", 20);
// Control the number of out-dated round that a file/dir is considered unavailable.
DEFINE_FLAG_INT32(delete_dir_file_round, "delete not exist file dir, round", 100);
#if defined(__linux__)
DEFINE_FLAG_INT32(dirfile_check_interval_ms, "dir file check interval, ms", 5000);
#elif defined(_MSC_VER)
// Windows only supports polling, check more frequently.
DEFINE_FLAG_INT32(dirfile_check_interval_ms, "dir file check interval, ms", 1000);
#endif
DEFINE_FLAG_INT32(dirfile_stat_count, "sleep when dir file stat count up to", 100);
DEFINE_FLAG_INT32(dirfile_stat_sleep, "sleep time when dir file stat up to 1000, ms", 30);
DEFINE_FLAG_INT32(polling_dir_upperlimit, "try to remove unchanged dir if dir count is up to", 500000);
DEFINE_FLAG_INT32(polling_file_upperlimit, "try to remove unchanged file if file count is up to", 500000);
DEFINE_FLAG_INT32(polling_dir_timeout, "remove unchanged dir if modify time is older than time", 12 * 3600);
DEFINE_FLAG_INT32(polling_file_timeout, "remove unchanged file if modify time is older than time", 12 * 3600);
DEFINE_FLAG_INT32(polling_dir_first_watch_timeout, "do not post event if modify time is up to", 3 * 3600);
DEFINE_FLAG_INT32(polling_file_first_watch_timeout, "do not post event if modify time is up to", 3 * 3600);
DEFINE_FLAG_INT32(polling_check_timeout_interval, " ", 600);
DEFINE_FLAG_INT32(polling_max_stat_count, "max stat count in each round", 1000000);
DEFINE_FLAG_INT32(polling_max_stat_count_per_dir, "max stat count per dir in each round", 100000);
DEFINE_FLAG_INT32(polling_max_stat_count_per_config, "max stat count per config in each round", 100000);
DEFINE_FLAG_INT32(polling_modify_repush_interval, "polling modify event repush interval, seconds", 10);
DECLARE_FLAG_INT32(wildcard_max_sub_dir_count);
using namespace std;
namespace logtail {
static const int64_t NANO_CONVERTING = 1000000000;
void PollingDirFile::Start() {
ClearCache();
mPollingDirCacheSize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_DIR_CACHE_SIZE);
mPollingFileCacheSize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_FILE_CACHE_SIZE);
mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
}
void PollingDirFile::Stop() {
mRuningFlag = false;
if (mThreadPtr != nullptr) {
try {
mThreadPtr->Wait(5 * 1000000);
} catch (...) {
LOG_ERROR(sLogger, ("stop polling dir file thread failed", ToString((int)mThreadPtr->GetState())));
}
}
LOG_INFO(sLogger, ("PollingDirFile", "stop"));
}
void PollingDirFile::HoldOn() {
LOG_INFO(sLogger, ("polling discovery pause", "starts"));
mHoldOnFlag = true;
mPollingThreadLock.lock();
LOG_INFO(sLogger, ("polling discovery pause", "succeeded"));
}
void PollingDirFile::Resume() {
LOG_INFO(sLogger, ("polling discovery resume", "starts"));
mHoldOnFlag = false;
mPollingThreadLock.unlock();
LOG_INFO(sLogger, ("polling discovery resume", "succeeded"));
}
void PollingDirFile::CheckConfigPollingStatCount(const int32_t lastStatCount,
const FileDiscoveryConfig& config,
bool isDockerConfig) {
auto diffCount = mStatCount - lastStatCount;
if (diffCount <= INT32_FLAG(polling_max_stat_count_per_config))
return;
std::string msgBase = "The polling stat count of this ";
if (isDockerConfig)
msgBase += "docker ";
msgBase += "config has exceeded limit";
LOG_WARNING(sLogger,
(msgBase, diffCount)(config.first->GetBasePath(), mStatCount)(config.second->GetProjectName(),
config.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(STAT_LIMIT_ALARM,
msgBase + ", current count: " + ToString(diffCount) + " total count:"
+ ToString(mStatCount) + " path: " + config.first->GetBasePath(),
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
}
void PollingDirFile::Polling() {
LOG_INFO(sLogger, ("polling discovery", "started"));
mHoldOnFlag = false;
while (mRuningFlag) {
PollingIteration();
// Sleep for a while, by default, 5s on Linux, 1s on Windows.
for (int i = 0; i < 10 && mRuningFlag; ++i) {
usleep(INT32_FLAG(dirfile_check_interval_ms) * 100);
}
}
LOG_DEBUG(sLogger, ("dir file polling thread done", ""));
}
void PollingDirFile::PollingIteration() {
LOG_DEBUG(sLogger, ("start dir file polling, mCurrentRound", mCurrentRound));
PTScopedLock threadLock(mPollingThreadLock);
mStatCount = 0;
mNewFileVec.clear();
++mCurrentRound;
// Get a copy of config list from ConfigManager.
// PollingDirFile has to be held on at first because raw pointers are used here.
vector<FileDiscoveryConfig> sortedConfigs;
vector<FileDiscoveryConfig> wildcardConfigs;
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) {
if (itr->second.first->GetWildcardPaths().empty())
sortedConfigs.push_back(itr->second);
else
wildcardConfigs.push_back(itr->second);
}
sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength);
LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(nameConfigMap.size());
{
ScopedSpinLock lock(mCacheLock);
SET_GAUGE(mPollingDirCacheSize, mDirCacheMap.size());
SET_GAUGE(mPollingFileCacheSize, mFileCacheMap.size());
}
// Iterate all normal configs, make sure stat count will not exceed limit.
for (auto itr = sortedConfigs.begin();
itr != sortedConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count);
++itr) {
if (!mRuningFlag || mHoldOnFlag)
break;
const FileDiscoveryOptions* config = itr->first;
const CollectionPipelineContext* ctx = itr->second;
if (!config->IsContainerDiscoveryEnabled()) {
fsutil::PathStat baseDirStat;
if (!fsutil::PathStat::stat(config->GetBasePath(), baseDirStat)) {
LOG_DEBUG(sLogger,
("get base dir info error: ", config->GetBasePath())(ctx->GetProjectName(),
ctx->GetLogstoreName()));
continue;
}
int32_t lastConfigStatCount = mStatCount;
if (!PollingNormalConfigPath(*itr, config->GetBasePath(), string(), baseDirStat, 0)) {
LOG_DEBUG(sLogger,
("logPath in config not exist", config->GetBasePath())(ctx->GetProjectName(),
ctx->GetLogstoreName()));
}
CheckConfigPollingStatCount(lastConfigStatCount, *itr, false);
} else {
for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) {
const string& basePath = (*config->GetContainerInfo())[i].mRealBaseDir;
fsutil::PathStat baseDirStat;
if (!fsutil::PathStat::stat(basePath, baseDirStat)) {
LOG_DEBUG(
sLogger,
("get docker base dir info error: ", basePath)(ctx->GetProjectName(), ctx->GetLogstoreName()));
continue;
}
int32_t lastConfigStatCount = mStatCount;
if (!PollingNormalConfigPath(*itr, basePath, string(), baseDirStat, 0)) {
LOG_DEBUG(sLogger,
("docker logPath in config not exist", basePath)(ctx->GetProjectName(),
ctx->GetLogstoreName()));
}
CheckConfigPollingStatCount(lastConfigStatCount, *itr, true);
}
}
}
// Iterate all wildcard configs, make sure stat count will not exceed limit.
for (auto itr = wildcardConfigs.begin();
itr != wildcardConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count);
++itr) {
if (!mRuningFlag || mHoldOnFlag)
break;
const FileDiscoveryOptions* config = itr->first;
const CollectionPipelineContext* ctx = itr->second;
if (!config->IsContainerDiscoveryEnabled()) {
int32_t lastConfigStatCount = mStatCount;
if (!PollingWildcardConfigPath(*itr, config->GetWildcardPaths()[0], 0)) {
LOG_DEBUG(sLogger,
("can not find matched path in config, Wildcard begin logPath",
config->GetBasePath())(ctx->GetProjectName(), ctx->GetLogstoreName()));
}
CheckConfigPollingStatCount(lastConfigStatCount, *itr, false);
} else {
for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) {
const string& baseWildcardPath = (*config->GetContainerInfo())[i].mRealBaseDir;
int32_t lastConfigStatCount = mStatCount;
if (!PollingWildcardConfigPath(*itr, baseWildcardPath, 0)) {
LOG_DEBUG(sLogger,
("can not find matched path in config, "
"Wildcard begin logPath ",
baseWildcardPath)(ctx->GetProjectName(), ctx->GetLogstoreName()));
}
CheckConfigPollingStatCount(lastConfigStatCount, *itr, true);
}
}
}
// Add collected new files to PollingModify.
PollingModify::GetInstance()->AddNewFile(mNewFileVec);
// Check cache, clear unavailable and overtime items.
if (mCurrentRound % INT32_FLAG(check_not_exist_file_dir_round) == 0) {
ClearUnavailableFileAndDir();
}
ClearTimeoutFileAndDir();
}
// Last Modified Time (LMD) of directory changes when a file or a subdirectory is added,
// removed or renamed. However, modifying the content of a file within it will not update
// LMD, and add/remove/rename file/directory in its subdirectory will also not update LMD.
// NOTE: So, we can not find changes in subdirectories of the directory according to LMD.
bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath,
const fsutil::PathStat& statBuf,
bool exceedPreservedDirDepth,
bool& newFlag) {
int64_t sec, nsec;
statBuf.GetLastWriteTime(sec, nsec);
int64_t modifyTime = NANO_CONVERTING * sec + nsec;
ScopedSpinLock lock(mCacheLock);
auto iter = mDirCacheMap.find(dirPath);
// New directory, add a new cache item for it.
if (iter == mDirCacheMap.end()) {
DirFileCache& dirCache = mDirCacheMap[dirPath];
dirCache.SetConfigMatched(true);
dirCache.SetExceedPreservedDirDepth(exceedPreservedDirDepth);
dirCache.SetCheckRound(mCurrentRound);
dirCache.SetLastModifyTime(modifyTime);
// Directories found at round 1 or too old are considered as old data.
auto curTime = static_cast<int32_t>(time(NULL));
if (mCurrentRound == 1 || curTime - sec > INT32_FLAG(polling_dir_first_watch_timeout)) {
newFlag = false;
} else {
newFlag = true;
dirCache.SetLastEventTime(curTime);
}
dirCache.SetEventFlag(newFlag);
return true;
}
// Already cached, update last round and modified time.
newFlag = false;
iter->second.SetCheckRound(mCurrentRound);
iter->second.SetLastModifyTime(modifyTime);
return true; // iter->second.HasMatchedConfig().
}
bool PollingDirFile::CheckAndUpdateFileMatchCache(const string& fileDir,
const string& fileName,
const fsutil::PathStat& statBuf,
bool needFindBestMatch,
bool exceedPreservedDirDepth) {
int64_t sec, nsec;
statBuf.GetLastWriteTime(sec, nsec);
int64_t modifyTime = NANO_CONVERTING * sec + nsec;
bool newFlag = false;
string filePath = PathJoin(fileDir, fileName);
FileCheckCacheMap::iterator iter = mFileCacheMap.find(filePath);
int32_t curTime = time(NULL);
if (iter == mFileCacheMap.end()) {
bool matchFlag = needFindBestMatch
? ConfigManager::GetInstance()->FindBestMatch(fileDir, fileName).first != nullptr
: true;
DirFileCache& fileCache = mFileCacheMap[filePath];
fileCache.SetConfigMatched(matchFlag);
fileCache.SetExceedPreservedDirDepth(exceedPreservedDirDepth);
fileCache.SetCheckRound(mCurrentRound);
fileCache.SetLastModifyTime(modifyTime);
// Files found at round 1 or too old are considered as old data.
if (mCurrentRound == 1 || curTime - sec > INT32_FLAG(polling_file_first_watch_timeout)) {
newFlag = false;
} else {
newFlag = true;
fileCache.SetLastEventTime(curTime);
}
fileCache.SetEventFlag(newFlag);
return matchFlag && newFlag;
}
// If the file is not overtime, repush it to PollingModify thread regularly (by default, 10s).
// Mainly for case that file is deleted and recreated after a while.
// In detail, it can avoid data missing when following things happen:
// 1. File is created, and PollingDirFile add it to cache and push it to PollingModify.
// 2. File is deleted, PollingModify removes it.
// 3. File is recreated, because PollingDirFile already caches it, new flag will
// not be set.
// 4. Now, PollingModify will not generate MODIFY event for the file because the file
// is not existing in polling file list. **We lose the file**.
if ((curTime - sec < INT32_FLAG(polling_file_first_watch_timeout))
&& (!iter->second.HasEventFlag()
|| (curTime - iter->second.GetLastEventTime() >= INT32_FLAG(polling_modify_repush_interval)))) {
newFlag = true;
iter->second.SetEventFlag(newFlag);
iter->second.SetLastEventTime(curTime);
}
iter->second.SetCheckRound(mCurrentRound);
iter->second.SetLastModifyTime(modifyTime);
return iter->second.HasMatchedConfig() && newFlag;
}
bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig,
const string& srcPath,
const string& obj,
const fsutil::PathStat& statBuf,
int depth) {
if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) {
return false;
}
bool exceedPreservedDirDepth = false;
if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) {
exceedPreservedDirDepth = true;
int64_t sec = 0;
int64_t nsec = 0;
statBuf.GetLastWriteTime(sec, nsec);
auto curTime = time(nullptr);
LOG_DEBUG(sLogger, ("PollingNormalConfigPath", srcPath + "/" + obj)("curTime", curTime)("writeTime", sec));
if (curTime - sec > INT32_FLAG(timeout_interval)) {
return false;
}
}
string dirPath = obj.empty() ? srcPath : PathJoin(srcPath, obj);
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(dirPath)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", dirPath));
return false;
}
bool isNewDirectory = false;
if (!CheckAndUpdateDirMatchCache(dirPath, statBuf, exceedPreservedDirDepth, isNewDirectory))
return true;
if (isNewDirectory) {
PollingEventQueue::GetInstance()->PushEvent(new Event(srcPath, obj, EVENT_CREATE | EVENT_ISDIR, -1, 0));
}
// Iterate directories and files in dirPath.
fsutil::Dir dir(dirPath);
if (!dir.Open()) {
auto err = GetErrno();
if (fsutil::Dir::IsENOENT(err)) {
LOG_DEBUG(sLogger, ("Open dir error, ENOENT, dir", dirPath.c_str()));
return false;
} else {
AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM,
string("Failed to open dir : ") + dirPath
+ ";\terrno : " + ToString(err),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
LOG_ERROR(sLogger, ("Open dir error", dirPath.c_str())("error", ErrnoToString(err)));
}
return true;
}
int32_t nowStatCount = 0;
fsutil::Entry ent;
while ((ent = dir.ReadNext(false))) {
if (!mRuningFlag || mHoldOnFlag)
break;
if (++mStatCount % INT32_FLAG(dirfile_stat_count) == 0) {
usleep(INT32_FLAG(dirfile_stat_sleep) * 1000);
}
if (mStatCount > INT32_FLAG(polling_max_stat_count)) {
LOG_WARNING(sLogger,
("total dir's polling stat count is exceeded", nowStatCount)(dirPath, mStatCount)(
pConfig.second->GetProjectName(), pConfig.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(
STAT_LIMIT_ALARM,
string("total dir's polling stat count is exceeded, now count:") + ToString(nowStatCount)
+ " total count:" + ToString(mStatCount) + " path: " + dirPath
+ " project:" + pConfig.second->GetProjectName() + " logstore:" + pConfig.second->GetLogstoreName(),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
break;
}
if (++nowStatCount > INT32_FLAG(polling_max_stat_count_per_dir)) {
LOG_WARNING(sLogger,
("this dir's polling stat count is exceeded", nowStatCount)(dirPath, mStatCount)(
pConfig.second->GetProjectName(), pConfig.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(
STAT_LIMIT_ALARM,
string("this dir's polling stat count is exceeded, now count:") + ToString(nowStatCount)
+ " total count:" + ToString(mStatCount) + " path: " + dirPath
+ " project:" + pConfig.second->GetProjectName() + " logstore:" + pConfig.second->GetLogstoreName(),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
break;
}
// If the type of item is raw directory or file, use MatchDirPattern or FindBestMatch
// to check if there are configs that match it.
auto entName = ent.Name();
string item = PathJoin(dirPath, entName);
bool needCheckDirMatch = true;
bool needFindBestMatch = true;
if (ent.IsDir()) {
// Have to call MatchDirPattern, because we have no idea which config matches
// the directory according to cache.
// TODO: Refactor directory cache, maintain all configs that match the directory.
needCheckDirMatch = false;
if (pConfig.first->IsDirectoryInBlacklist(item)) {
continue;
}
} else if (ent.IsRegFile()) {
// TODO: Add file cache looking up here: we can skip the file if it is in cache
// and the match flag is false (no config matches it).
// There is a cache in FindBestMatch, so the overhead is acceptable now.
needFindBestMatch = false;
if (!ConfigManager::GetInstance()->FindBestMatch(dirPath, entName).first) {
continue;
}
} else {
// Symbolic link should be passed, while other types file should ignore.
if (!ent.IsSymbolic()) {
LOG_DEBUG(sLogger, ("should ignore, other type file", item.c_str()));
continue;
}
}
// Mainly for symbolic (Linux), we need to use stat to dig out the real type.
fsutil::PathStat buf;
if (!fsutil::PathStat::stat(item, buf)) {
LOG_DEBUG(sLogger, ("get file info error", item.c_str())("errno", errno));
continue;
}
// For directory, poll recursively; for file, update cache and add to mNewFileVec so that
// it can be pushed to PollingModify at the end of polling.
// If needCheckDirMatch or needFindBestMatch is true, that means the item is a symbolic link.
// We should check file type again to make sure that the original file which linked by
// a symbolic file is DIR or REG.
if (buf.IsDir() && (!needCheckDirMatch || !pConfig.first->IsDirectoryInBlacklist(item))) {
PollingNormalConfigPath(pConfig, dirPath, entName, buf, depth + 1);
} else if (buf.IsRegFile()) {
if (CheckAndUpdateFileMatchCache(dirPath, entName, buf, needFindBestMatch, exceedPreservedDirDepth)) {
LOG_DEBUG(sLogger, ("add to modify event", entName)("round", mCurrentRound));
mNewFileVec.push_back(SplitedFilePath(dirPath, entName));
}
} else {
// Ignore other file type.
LOG_DEBUG(sLogger, ("other type file is linked by a symbolic link, should ignore", item.c_str()));
continue;
}
}
return true;
}
// PollingWildcardConfigPath will iterate mWildcardPaths one by one, and according to
// corresponding value in mConstWildcardPaths, call PollingNormalConfigPath or call
// PollingWildcardConfigPath recursively.
bool PollingDirFile::PollingWildcardConfigPath(const FileDiscoveryConfig& pConfig, const string& dirPath, int depth) {
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(dirPath)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", dirPath));
return false;
}
auto const wildcardPathSize = static_cast<int>(pConfig.first->GetWildcardPaths().size());
if (depth - wildcardPathSize > pConfig.first->mMaxDirSearchDepth)
return false;
bool finish = false;
if ((depth + 1) < (wildcardPathSize - 1))
finish = false;
else if ((depth + 1) == (wildcardPathSize - 1))
finish = true;
else {
// This should not happen.
LOG_ERROR(sLogger,
("PollingWildcardConfigPath error: ", dirPath.c_str())(pConfig.second->GetProjectName(),
pConfig.second->GetLogstoreName()));
return false;
}
// if sub path is const, we do not need to scan whole dir
// Current part is constant, check if it is existing directly.
if (!pConfig.first->GetConstWildcardPaths()[depth].empty()) {
// Stat directly, stat failure means that the directory is not existing or we have no
// permission to access it, just return true to stop polling.
string item = PathJoin(dirPath, pConfig.first->GetConstWildcardPaths()[depth]);
fsutil::PathStat baseDirStat;
if (!fsutil::PathStat::stat(item, baseDirStat)) {
LOG_DEBUG(sLogger,
("get wildcard dir info error: ", pConfig.first->GetBasePath())("stat path", item)(
pConfig.second->GetProjectName(),
pConfig.second->GetLogstoreName())("error", ErrnoToString(GetErrno())));
return true;
}
if (!baseDirStat.IsDir())
return true;
// finish indicates that current part is the last one, so we can
// call PollingNormalConfigPath to iterate remaining content.
// Otherwise, call PollingWildcardConfigPath to deal with remaining parts.
if (finish) {
PollingNormalConfigPath(pConfig, item, string(), baseDirStat, 0);
} else {
PollingWildcardConfigPath(pConfig, item, depth + 1);
}
return true;
}
// Current part is not constant (normal) path, so we have to iterate and match one by one.
bool hasMatchFlag = false;
fsutil::Dir dir(dirPath);
if (!dir.Open()) {
auto err = GetErrno();
if (fsutil::Dir::IsENOENT(err)) {
LOG_DEBUG(sLogger, ("Open dir fail, ENOENT, dir", dirPath.c_str()));
return false;
} else {
AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM,
string("Failed to open dir : ") + dirPath
+ ";\terrno : " + ToString(err),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
LOG_WARNING(sLogger, ("Open dir fail", dirPath.c_str())("errno", err));
}
return true;
}
fsutil::Entry ent;
int32_t dirCount = 0;
while ((ent = dir.ReadNext(false))) {
if (!mRuningFlag || mHoldOnFlag)
break;
if (dirCount >= INT32_FLAG(wildcard_max_sub_dir_count)) {
LOG_WARNING(sLogger,
("too many sub directoried for path",
dirPath)("dirCount", dirCount)("basePath", pConfig.first->GetBasePath()));
AlarmManager::GetInstance()->SendAlarm(STAT_LIMIT_ALARM,
string("too many sub directoried for path:" + dirPath
+ " dirCount: " + ToString(dirCount) + " basePath"
+ pConfig.first->GetBasePath()),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
break;
}
if (++mStatCount % INT32_FLAG(dirfile_stat_count) == 0)
usleep(INT32_FLAG(dirfile_stat_sleep) * 1000);
if (mStatCount > INT32_FLAG(polling_max_stat_count)) {
LOG_WARNING(sLogger,
("total dir's polling stat count is exceeded",
"")(dirPath, mStatCount)(pConfig.second->GetProjectName(), pConfig.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(
STAT_LIMIT_ALARM,
string("total dir's polling stat count is exceeded, total count:" + ToString(mStatCount)
+ " path: " + dirPath + " project:" + pConfig.second->GetProjectName()
+ " logstore:" + pConfig.second->GetLogstoreName()),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
break;
}
auto entName = ent.Name();
string item = PathJoin(dirPath, entName);
fsutil::PathStat buf;
if (!fsutil::PathStat::stat(item, buf)) {
LOG_WARNING(sLogger, ("get file info fail", item.c_str())("errno", GetErrno()));
continue;
}
if (buf.IsDir()) {
++dirCount;
// Use the next part to match the entry name.
size_t dirIndex = 0;
if (!BOOL_FLAG(enable_root_path_collection)) {
// Handle special path /.
dirIndex = pConfig.first->GetWildcardPaths()[depth].size() + 1;
if (dirIndex == (size_t)2) {
dirIndex = 1;
}
} else {
// A better logic, but only enabled when flag enable_root_path_collection
// is set for backward compatibility.
dirIndex = pConfig.first->GetWildcardPaths()[depth].size();
if (PATH_SEPARATOR[0] == pConfig.first->GetWildcardPaths()[depth + 1][dirIndex]) {
++dirIndex;
}
}
if (fnmatch(&(pConfig.first->GetWildcardPaths()[depth + 1].at(dirIndex)), entName.c_str(), FNM_PATHNAME)
== 0) {
if (finish) {
hasMatchFlag = true;
PollingNormalConfigPath(pConfig, item, string(), buf, 0);
} else {
hasMatchFlag |= PollingWildcardConfigPath(pConfig, item, depth + 1);
}
}
}
}
return hasMatchFlag;
}
void PollingDirFile::ClearTimeoutFileAndDir() {
int32_t curTime = (int32_t)time(NULL);
static int32_t s_lastClearTime = 0;
if (curTime - s_lastClearTime < INT32_FLAG(polling_check_timeout_interval)) {
return;
}
// Collect deleted files, so that it can notify PollingModify later.
s_lastClearTime = curTime;
std::vector<SplitedFilePath> deleteFileVec;
{
ScopedSpinLock lock(mCacheLock);
bool clearExceedPreservedDirDepth = false;
for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) {
if (iter->second.GetExceedPreservedDirDepth()
&& (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime())
> NANO_CONVERTING * INT32_FLAG(timeout_interval)) {
iter = mDirCacheMap.erase(iter);
clearExceedPreservedDirDepth = true;
} else
++iter;
}
if (clearExceedPreservedDirDepth) {
LOG_INFO(sLogger, ("After clear DirCache size", mDirCacheMap.size()));
}
clearExceedPreservedDirDepth = false;
for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) {
if (iter->second.GetExceedPreservedDirDepth()
&& (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime())
> NANO_CONVERTING * INT32_FLAG(timeout_interval)) {
// If the file has been added to PollingModify, remove it here.
if (iter->second.HasMatchedConfig() && iter->second.HasEventFlag()) {
deleteFileVec.push_back(SplitedFilePath(iter->first));
LOG_INFO(sLogger, ("delete file cache", iter->first)("vec size", deleteFileVec.size()));
}
iter = mFileCacheMap.erase(iter);
clearExceedPreservedDirDepth = true;
} else
++iter;
}
if (clearExceedPreservedDirDepth) {
LOG_INFO(sLogger, ("After clear FileCache size", mFileCacheMap.size()));
}
if (mDirCacheMap.size() > (size_t)INT32_FLAG(polling_dir_upperlimit)) {
LOG_INFO(sLogger, ("start clear dir cache", mDirCacheMap.size()));
for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) {
if ((NANO_CONVERTING * curTime - iter->second.GetLastModifyTime())
> NANO_CONVERTING * INT32_FLAG(polling_dir_timeout)) {
iter = mDirCacheMap.erase(iter);
} else
++iter;
}
LOG_INFO(sLogger, ("After clear DirCache size", mDirCacheMap.size()));
}
if (mFileCacheMap.size() > (size_t)INT32_FLAG(polling_file_upperlimit)) {
LOG_INFO(sLogger, ("start clear file cache", mFileCacheMap.size()));
for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) {
if ((NANO_CONVERTING * curTime - iter->second.GetLastModifyTime())
> NANO_CONVERTING * INT32_FLAG(polling_file_timeout)) {
// If the file has been added to PollingModify, remove it here.
if (iter->second.HasMatchedConfig() && iter->second.HasEventFlag()) {
deleteFileVec.push_back(SplitedFilePath(iter->first));
LOG_INFO(sLogger, ("delete file cache", iter->first)("vec size", deleteFileVec.size()));
}
iter = mFileCacheMap.erase(iter);
} else
++iter;
}
LOG_INFO(sLogger, ("After clear FileCache size", mFileCacheMap.size()));
}
}
if (!deleteFileVec.empty()) {
PollingModify::GetInstance()->AddDeleteFile(deleteFileVec);
}
}
void PollingDirFile::ClearUnavailableFileAndDir() {
// Collected genereated events.
// For directory, if its cache item becomes unavailable, a TIMEOUT event will be
// generated to notify that related directories should be unregistered to release
// resources (LogInput::ProcessEvent -> EventDispatcher::UnregisterAllDir).
std::vector<Event*> eventVec;
{
ScopedSpinLock lock(mCacheLock);
for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) {
auto& cacheItem = iter->second;
if (mCurrentRound - cacheItem.GetLastCheckRound() > (uint64_t)INT32_FLAG(delete_dir_file_round)) {
if (cacheItem.HasMatchedConfig()) {
eventVec.push_back(new Event(iter->first, string(), EVENT_TIMEOUT | EVENT_ISDIR, 0, 0));
}
iter = mDirCacheMap.erase(iter);
} else
++iter;
}
// Files need not to generate delete event, it is PollingModify's responsibility.
for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) {
if (mCurrentRound - iter->second.GetLastCheckRound() > (uint64_t)INT32_FLAG(delete_dir_file_round)) {
iter = mFileCacheMap.erase(iter);
} else
++iter;
}
}
if (!eventVec.empty())
PollingEventQueue::GetInstance()->PushEvent(eventVec);
}
PollingDirFile::PollingDirFile() {
mStatCount = 0;
mCurrentRound = 0;
}
PollingDirFile::~PollingDirFile() {
}
} // namespace logtail