in core/file_server/polling/PollingDirFile.cpp [356:507]
bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig,
const string& srcPath,
const string& obj,
const fsutil::PathStat& statBuf,
int depth) {
if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) {
return false;
}
bool exceedPreservedDirDepth = false;
if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) {
exceedPreservedDirDepth = true;
int64_t sec = 0;
int64_t nsec = 0;
statBuf.GetLastWriteTime(sec, nsec);
auto curTime = time(nullptr);
LOG_DEBUG(sLogger, ("PollingNormalConfigPath", srcPath + "/" + obj)("curTime", curTime)("writeTime", sec));
if (curTime - sec > INT32_FLAG(timeout_interval)) {
return false;
}
}
string dirPath = obj.empty() ? srcPath : PathJoin(srcPath, obj);
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(dirPath)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", dirPath));
return false;
}
bool isNewDirectory = false;
if (!CheckAndUpdateDirMatchCache(dirPath, statBuf, exceedPreservedDirDepth, isNewDirectory))
return true;
if (isNewDirectory) {
PollingEventQueue::GetInstance()->PushEvent(new Event(srcPath, obj, EVENT_CREATE | EVENT_ISDIR, -1, 0));
}
// Iterate directories and files in dirPath.
fsutil::Dir dir(dirPath);
if (!dir.Open()) {
auto err = GetErrno();
if (fsutil::Dir::IsENOENT(err)) {
LOG_DEBUG(sLogger, ("Open dir error, ENOENT, dir", dirPath.c_str()));
return false;
} else {
AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM,
string("Failed to open dir : ") + dirPath
+ ";\terrno : " + ToString(err),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
LOG_ERROR(sLogger, ("Open dir error", dirPath.c_str())("error", ErrnoToString(err)));
}
return true;
}
int32_t nowStatCount = 0;
fsutil::Entry ent;
while ((ent = dir.ReadNext(false))) {
if (!mRuningFlag || mHoldOnFlag)
break;
if (++mStatCount % INT32_FLAG(dirfile_stat_count) == 0) {
usleep(INT32_FLAG(dirfile_stat_sleep) * 1000);
}
if (mStatCount > INT32_FLAG(polling_max_stat_count)) {
LOG_WARNING(sLogger,
("total dir's polling stat count is exceeded", nowStatCount)(dirPath, mStatCount)(
pConfig.second->GetProjectName(), pConfig.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(
STAT_LIMIT_ALARM,
string("total dir's polling stat count is exceeded, now count:") + ToString(nowStatCount)
+ " total count:" + ToString(mStatCount) + " path: " + dirPath
+ " project:" + pConfig.second->GetProjectName() + " logstore:" + pConfig.second->GetLogstoreName(),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
break;
}
if (++nowStatCount > INT32_FLAG(polling_max_stat_count_per_dir)) {
LOG_WARNING(sLogger,
("this dir's polling stat count is exceeded", nowStatCount)(dirPath, mStatCount)(
pConfig.second->GetProjectName(), pConfig.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(
STAT_LIMIT_ALARM,
string("this dir's polling stat count is exceeded, now count:") + ToString(nowStatCount)
+ " total count:" + ToString(mStatCount) + " path: " + dirPath
+ " project:" + pConfig.second->GetProjectName() + " logstore:" + pConfig.second->GetLogstoreName(),
pConfig.second->GetRegion(),
pConfig.second->GetProjectName(),
pConfig.second->GetConfigName(),
pConfig.second->GetLogstoreName());
break;
}
// If the type of item is raw directory or file, use MatchDirPattern or FindBestMatch
// to check if there are configs that match it.
auto entName = ent.Name();
string item = PathJoin(dirPath, entName);
bool needCheckDirMatch = true;
bool needFindBestMatch = true;
if (ent.IsDir()) {
// Have to call MatchDirPattern, because we have no idea which config matches
// the directory according to cache.
// TODO: Refactor directory cache, maintain all configs that match the directory.
needCheckDirMatch = false;
if (pConfig.first->IsDirectoryInBlacklist(item)) {
continue;
}
} else if (ent.IsRegFile()) {
// TODO: Add file cache looking up here: we can skip the file if it is in cache
// and the match flag is false (no config matches it).
// There is a cache in FindBestMatch, so the overhead is acceptable now.
needFindBestMatch = false;
if (!ConfigManager::GetInstance()->FindBestMatch(dirPath, entName).first) {
continue;
}
} else {
// Symbolic link should be passed, while other types file should ignore.
if (!ent.IsSymbolic()) {
LOG_DEBUG(sLogger, ("should ignore, other type file", item.c_str()));
continue;
}
}
// Mainly for symbolic (Linux), we need to use stat to dig out the real type.
fsutil::PathStat buf;
if (!fsutil::PathStat::stat(item, buf)) {
LOG_DEBUG(sLogger, ("get file info error", item.c_str())("errno", errno));
continue;
}
// For directory, poll recursively; for file, update cache and add to mNewFileVec so that
// it can be pushed to PollingModify at the end of polling.
// If needCheckDirMatch or needFindBestMatch is true, that means the item is a symbolic link.
// We should check file type again to make sure that the original file which linked by
// a symbolic file is DIR or REG.
if (buf.IsDir() && (!needCheckDirMatch || !pConfig.first->IsDirectoryInBlacklist(item))) {
PollingNormalConfigPath(pConfig, dirPath, entName, buf, depth + 1);
} else if (buf.IsRegFile()) {
if (CheckAndUpdateFileMatchCache(dirPath, entName, buf, needFindBestMatch, exceedPreservedDirDepth)) {
LOG_DEBUG(sLogger, ("add to modify event", entName)("round", mCurrentRound));
mNewFileVec.push_back(SplitedFilePath(dirPath, entName));
}
} else {
// Ignore other file type.
LOG_DEBUG(sLogger, ("other type file is linked by a symbolic link, should ignore", item.c_str()));
continue;
}
}
return true;
}