core/file_server/EventDispatcher.cpp (870 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "EventDispatcher.h"
#include "Flags.h"
#if defined(__linux__)
#include <fnmatch.h>
#include <sys/epoll.h>
#include <sys/inotify.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/un.h>
#endif
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <sys/types.h>
#include <vector>
#include "app_config/AppConfig.h"
#include "checkpoint/CheckPointManager.h"
#include "checkpoint/CheckpointManagerV2.h"
#include "common/ErrorUtil.h"
#include "common/FileSystemUtil.h"
#include "common/HashUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/version.h"
#include "file_server/event/Event.h"
#include "file_server/event_handler/EventHandler.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingModify.h"
#include "monitor/AlarmManager.h"
#include "protobuf/sls/metric.pb.h"
#include "protobuf/sls/sls_logs.pb.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "file_server/polling/PollingEventQueue.h"
#endif
#include "application/Application.h"
#include "collection_pipeline/CollectionPipelineManager.h"
#include "collection_pipeline/plugin/PluginRegistry.h"
#include "file_server/ConfigManager.h"
#include "file_server/FileServer.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"
using namespace std;
using namespace sls_logs;
// 商业版
// DEFINE_FLAG_BOOL(merge_shennong_metric, "merge LogGroup into LogPackageList if true", true);
// 废弃
// DEFINE_FLAG_INT32(main_loop_check_interval, "seconds", 60);
// 移动
// DEFINE_FLAG_INT32(ilogtail_epoll_time_out, "default time out is 1s", 1);
// DEFINE_FLAG_INT32(tcmalloc_release_memory_interval, "force release memory held by tcmalloc, seconds", 300);
// DEFINE_FLAG_BOOL(fs_events_inotify_enable, "", true);
// DEFINE_FLAG_INT32(exit_flushout_duration, "exit process flushout duration", 20 * 1000);
// DEFINE_FLAG_INT32(search_checkpoint_default_dir_depth, "0 means only search current directory", 0);
// DEFINE_FLAG_BOOL(enable_polling_discovery, "", true);
DEFINE_FLAG_INT32(existed_file_active_timeout,
"when first monitor directory, file modified in 120 seconds will be collected",
120);
DEFINE_FLAG_INT32(checkpoint_find_max_cache_size, "", 100000);
DEFINE_FLAG_INT32(max_watch_dir_count, "", 100 * 1000);
DEFINE_FLAG_INT32(default_max_inotify_watch_num, "the max allowed inotify watch dir number", 3000);
namespace logtail {
EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0), mEventListener(EventListener::GetInstance()) {
/*
* May add multiple inotify fd instances in the future,
* so use epoll here though a little more sophisticated than select
*/
// NOTE: epoll is used to drive mListenFd and mStreaLogTcpFd, they only work on Linux.
// #if defined(__linux__)
// mEpollFd = epoll_create(INT32_FLAG(ilogtail_max_epoll_events));
// mListenFd = -1;
// mStreamLogTcpFd = -1;
// #endif
if (!AppConfig::GetInstance()->NoInotify()) {
if (!mEventListener->Init()) {
AlarmManager::GetInstance()->SendAlarm(EPOLL_ERROR_ALARM,
string("faild to init inotify fd, errno:") + ToString(GetErrno()));
LOG_ERROR(sLogger, ("faild to init inotify fd, errno:", errno));
}
} else {
LOG_INFO(sLogger, ("do not use inoitfy", ""));
}
/** Register a timeout handler.
* timeoutHandler -> Handle is called whenever no events occur
* under any path added by AddTimeoutWatch or its offsprings' during timeout seconds.
* That is to say, if any event occurs under one directory,
* its time record propagates to its ancestors who also are added to timeout watch.
* Propagation stops at the ancestor who hasn't been added to timeout watch.
* Time is accounted when Dispatch is called.
* mTimeoutHandler -> Handle is called when no events occur under path during timeout seconds.
*/
mTimeoutHandler = new TimeoutHandler();
mNonInotifyWd = -1;
}
EventDispatcher::~EventDispatcher() {
// #if defined(__linux__)
// if (mStreamLogManagerPtr != NULL) {
// delete (StreamLogManager*)mStreamLogManagerPtr;
// }
// if (mEpollFd >= 0)
// close(mEpollFd);
// if (mStreamLogTcpFd >= 0)
// close(mStreamLogTcpFd);
// if (mListenFd >= 0)
// close(mListenFd);
// #endif
mEventListener->Destroy();
if (mTimeoutHandler)
delete mTimeoutHandler;
}
bool EventDispatcher::RegisterEventHandler(const string& path,
const FileDiscoveryConfig& config,
EventHandler*& handler) {
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
// @todo
// if this path belong to many config, if register one config with max_depth 0, then it will register fail
if (!config.first->WithinMaxDepth(path)) {
LOG_DEBUG(sLogger,
("path is out of maxDepth",
path)("logstore", config.second->GetLogstoreName())("max depth", config.first->mMaxDirSearchDepth));
return false;
}
fsutil::PathStat statBuf;
// if symbolic link, return the referred inode
if (!fsutil::PathStat::stat(path, statBuf)) {
if (errno != EEXIST) {
LOG_WARNING(sLogger, ("call stat() on path fail", path)("errno", errno));
AlarmManager::GetInstance()->SendAlarm(REGISTER_INOTIFY_FAIL_ALARM,
"call stat() on path fail" + string(path)
+ ", errno: " + ToString(errno) + ", will not be monitored",
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
}
LOG_DEBUG(sLogger, ("call stat() on path fail", path)("errno", errno));
return false;
}
if (!statBuf.IsDir()) {
LOG_DEBUG(sLogger, ("path is not directory, will not register inotify monitor", path));
return false;
}
uint64_t inode = statBuf.GetDevInode().inode;
int wd = -1;
MapType<string, int>::Type::iterator pathIter = mPathWdMap.find(path);
if (pathIter != mPathWdMap.end()) {
wd = pathIter->second;
if (inode != mWdDirInfoMap[wd]->mInode) {
LOG_INFO(sLogger,
("dir's inode was changed", path)("inode_before", mWdDirInfoMap[wd]->mInode)("inode_now", inode));
}
if (!AppConfig::GetInstance()->IgnoreDirInodeChanged() && inode != mWdDirInfoMap[wd]->mInode) {
LOG_INFO(sLogger,
("dir's inode was changed, unregister this path recursively",
path)("inode_before", mWdDirInfoMap[wd]->mInode)("inode_now", inode));
UnregisterAllDir(path);
} else {
if (handler == ConfigManager::GetInstance()->GetSharedHandler()) {
LOG_DEBUG(
sLogger,
("ignore replace handler",
"both path and inode of dir is registered, handler is sharedHandler")("path", path)("wd", wd));
return true;
} else if (ConfigManager::GetInstance()->GetSharedHandler() == mWdDirInfoMap[wd]->mHandler) {
LOG_DEBUG(sLogger,
("replace handler", "both path and inode of dir is registered, just replace EventHandler")(
"path", path)("wd", wd));
mWdDirInfoMap[wd]->mHandler = handler;
return true;
} else {
LOG_DEBUG(sLogger, ("still use current hander, dir", path)("inode", inode)("wd", wd));
handler = mWdDirInfoMap[wd]->mHandler;
return true;
}
}
}
if (mWatchNum >= INT32_FLAG(max_watch_dir_count)) {
LOG_WARNING(sLogger,
("fail to monitor dir, max_watch_dir_count", INT32_FLAG(max_watch_dir_count))("dir", path));
AlarmManager::GetInstance()->SendAlarm(DIR_EXCEED_LIMIT_ALARM,
string("dir: ") + path
+ " will not monitored, dir count should less than "
+ ToString(INT32_FLAG(max_watch_dir_count)),
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
return false;
}
wd = -1;
if (mInotifyWatchNum >= INT32_FLAG(default_max_inotify_watch_num)) {
LOG_INFO(sLogger,
("failed to add inotify watcher for dir", path)("max allowed inotify watchers",
INT32_FLAG(default_max_inotify_watch_num)));
AlarmManager::GetInstance()->SendAlarm(INOTIFY_DIR_NUM_LIMIT_ALARM,
string("failed to register inotify watcher for dir") + path,
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
} else {
// need check mEventListener valid
if (mEventListener->IsInit() && !AppConfig::GetInstance()->IsInInotifyBlackList(path)) {
wd = mEventListener->AddWatch(path.c_str());
if (!EventListener::IsValidID(wd)) {
string str = ErrnoToString(GetErrno());
LOG_WARNING(sLogger, ("failed to register dir", path)("reason", str));
#if defined(__linux__)
// work around bug 13229654
if (errno == EINVAL || errno == EBADF) {
LOG_ERROR(sLogger,
("failed to register dir", path)("errno", errno)("error", str)("force exit",
"wait 10 seconds."));
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
string("Failed to register dir: ") + path + ", errno: "
+ ToString(errno) + ", error: " + str + ", force exit",
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
#endif
if (config.first->IsTimeout(path))
AlarmManager::GetInstance()->SendAlarm(REGISTER_INOTIFY_FAIL_ALARM,
string("Failed to register dir: ") + path + ", reason: "
+ str + ", project: " + config.second->GetProjectName()
+ ", logstore: " + config.second->GetLogstoreName(),
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
else
AlarmManager::GetInstance()->SendAlarm(REGISTER_INOTIFY_FAIL_ALARM,
string("Failed to register dir: ") + path
+ ", reason: " + str + ", no timeout",
config.second->GetRegion(),
config.second->GetProjectName(),
config.second->GetConfigName(),
config.second->GetLogstoreName());
} else {
// recheck inode, wd is relevance to inode
if (mWdDirInfoMap.find(wd) != mWdDirInfoMap.end()) {
// e.g. this path is symbolic link, and it reference to dir which was registered before
LOG_DEBUG(sLogger,
("can not register inotify monitor", path)("inode", inode)("wd", wd)(
"reason", "there is already a dir in inotify watch list shard the same inode"));
wd = -1;
} else
mInotifyWatchNum++;
}
}
}
bool dirTimeOutFlag = config.first->IsTimeout(path);
if (!EventListener::IsValidID(wd)) {
wd = mNonInotifyWd;
if (mNonInotifyWd == INT_MIN)
mNonInotifyWd = -1;
else
--mNonInotifyWd;
}
fsutil::PathStat lstatBuf;
bool isSymbolicLink = false;
if (fsutil::PathStat::lstat(path, lstatBuf) && lstatBuf.IsLink())
isSymbolicLink = true;
if (mBrokenLinkSet.find(path) != mBrokenLinkSet.end()) {
mBrokenLinkSet.erase(path);
}
LOG_INFO(sLogger,
("add a new watcher for dir", path)("wd", wd)("dir inode", inode)("isSymbolicLink", isSymbolicLink));
DirInfo* dirInfo = new DirInfo(path, inode, isSymbolicLink, handler);
AddOneToOneMapEntry(dirInfo, wd);
++mWatchNum;
AddExistedFileEvents(path, wd);
if (dirTimeOutFlag) {
AddTimeoutWatch(path);
LOG_DEBUG(sLogger,
("AddTimeoutWatch, source", path)("config, basepath", config.first->GetBasePath())(
"preseveDepth", config.first->mPreservedDirDepth)("maxDepth", config.first->mMaxDirSearchDepth));
}
return true;
}
// read files when add dir inotify watcher at first time
void EventDispatcher::AddExistedFileEvents(const string& path, int wd) {
fsutil::Dir dir(path);
if (!dir.Open()) {
auto err = GetErrno();
if (!fsutil::Dir::IsENOENT(err)) {
LOG_ERROR(sLogger, ("Open dir fail", path)("error", ErrnoToString(err)));
}
return;
}
int32_t MAX_TAIL_FILE_COUNT = 50;
vector<FileDiscoveryConfig> configs;
ConfigManager::GetInstance()->GetRelatedConfigs(path, configs);
for (auto iter = configs.begin(); iter != configs.end(); ++iter) {
if ((*iter).first->mMaxDirSearchDepth == 0) {
MAX_TAIL_FILE_COUNT = 1000;
break;
}
}
fsutil::Entry ent;
int32_t curTime = time(NULL);
vector<Event*> eventVec;
int32_t tailFileCount = 0;
while ((ent = dir.ReadNext(false))) {
++tailFileCount;
if (tailFileCount > MAX_TAIL_FILE_COUNT) {
LOG_DEBUG(sLogger,
("tail existed files for dir", path)("too many files", "ignore the rest")("max file count",
MAX_TAIL_FILE_COUNT));
break;
}
auto entName = ent.Name();
if (!IsValidSuffix(entName))
continue;
string item = PathJoin(path, entName);
// move to AddExistedCheckPointFileEvents
// CheckPointPtr checkPointSharePtr;
// if (CheckPointManager::Instance()->GetCheckPoint(item, checkPointSharePtr))
//{
// if (checkPointSharePtr->mInode == NO_BLOCK_INODE || checkPointSharePtr->mInode == GetFileInode(item))
// {
// eventVec.push_back(new Event(string(path), string(ent->d_name), EVENT_MODIFY, wd));
// }
// else
// {
// CheckPointManager::Instance()->DeleteCheckPoint(item);
// }
// continue;
//}
bool isMatch = false;
bool tailExisted = false;
for (auto iter = configs.begin(); iter != configs.end(); ++iter) {
if (fnmatch((*iter).first->GetFilePattern().c_str(), entName.c_str(), 0) == 0) {
isMatch = true;
if ((*iter).first->IsTailingAllMatchedFiles()) {
tailExisted = true;
break;
}
}
}
if (!isMatch)
continue;
fsutil::PathStat buf;
if (fsutil::PathStat::stat(item, buf) && buf.IsRegFile()
&& (curTime - int32_t(buf.GetMtime()) < INT32_FLAG(existed_file_active_timeout) || tailExisted)) {
eventVec.push_back(new Event(string(path), entName, EVENT_MODIFY, wd));
}
}
for (size_t i = 0; i < eventVec.size(); ++i) {
LOG_INFO(sLogger,
("generate MODIFY event for the recently updated existing file",
PathJoin(eventVec[i]->GetSource(), eventVec[i]->GetObject())));
}
if (eventVec.size() > 0)
LogInput::GetInstance()->PushEventQueue(eventVec);
}
EventDispatcher::ValidateCheckpointResult EventDispatcher::validateCheckpoint(
CheckPointPtr& checkpoint, map<DevInode, SplitedFilePath>& cachePathDevInodeMap, vector<Event*>& eventVec) {
shared_ptr<CollectionPipeline> config
= CollectionPipelineManager::GetInstance()->FindConfigByName(checkpoint->mConfigName);
if (config == NULL) {
LOG_INFO(sLogger,
("delete checkpoint", "the corresponding config is deleted")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kConfigNotFound;
}
// Use FileName (logical absolute path) to do config matching.
const string& filePath = checkpoint->mFileName;
const string realFilePath = checkpoint->mRealFileName.empty() ? filePath : checkpoint->mRealFileName;
size_t lastSeparator = filePath.find_last_of(PATH_SEPARATOR);
if (lastSeparator == string::npos || lastSeparator == (size_t)0 || lastSeparator >= filePath.size()) {
LOG_INFO(sLogger,
("delete checkpoint", "invalid log reader queue name")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kInvalidFilePath;
}
string path = filePath.substr(0, lastSeparator);
string fileName = filePath.substr(lastSeparator + 1);
// Check if the config in checkpoint still matches the file?
vector<FileDiscoveryConfig> matchedConfigs;
AppConfig::GetInstance()->IsAcceptMultiConfig()
? ConfigManager::GetInstance()->FindAllMatch(matchedConfigs, path, fileName)
: ConfigManager::GetInstance()->FindMatchWithForceFlag(matchedConfigs, path, fileName);
bool stillMatch = false;
for (size_t idx = 0; idx < matchedConfigs.size(); ++idx) {
if (matchedConfigs[idx].second->GetConfigName() == checkpoint->mConfigName) {
stillMatch = true;
break;
}
}
if (!stillMatch) {
LOG_INFO(
sLogger,
("delete checkpoint", "original config no more matches the file path")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kConfigNotMatched;
}
// Determine the type of input plugin.
string name = config->GetInputs()[0]->GetPlugin()->Name();
const InputFile* inputFile = nullptr;
const InputContainerStdio* inputContainerStdio = nullptr;
if (name == InputFile::sName) {
inputFile = static_cast<const InputFile*>(config->GetInputs()[0]->GetPlugin());
} else if (name == InputContainerStdio::sName) {
inputContainerStdio = static_cast<const InputContainerStdio*>(config->GetInputs()[0]->GetPlugin());
}
// delete checkpoint if file path is not exist
MapType<string, int>::Type::iterator pathIter = mPathWdMap.find(path);
if (pathIter == mPathWdMap.end()) {
LOG_INFO(sLogger,
("delete checkpoint", "file path no longer exists")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kLogDirNotWatched;
}
int wd = pathIter->second;
DevInode devInode = GetFileDevInode(realFilePath);
if (devInode.IsValid() && checkpoint->mDevInode.inode == devInode.inode) {
if (!CheckFileSignature(realFilePath, checkpoint->mSignatureHash, checkpoint->mSignatureSize)) {
LOG_INFO(sLogger,
("delete checkpoint", "file device & inode remains the same but signature has changed")(
"config", checkpoint->mConfigName)("log reader queue name", checkpoint->mFileName)(
"real file path", checkpoint->mRealFileName)("file device", checkpoint->mDevInode.inode)(
"file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kSigChanged;
}
if (checkpoint->mDevInode.dev != devInode.dev) {
// all other checks passed. dev may be a stateful set pv remounted on another node
checkpoint->mDevInode.dev = devInode.dev;
}
LOG_INFO(sLogger,
("generate MODIFY event for file with checkpoint",
"nothing changed on the file")("config", checkpoint->mConfigName)("log reader queue name", filePath)(
"real file path", realFilePath)("file device", checkpoint->mDevInode.dev)(
"file inode", checkpoint->mDevInode.inode)("signature", checkpoint->mSignatureHash)(
"last file position", checkpoint->mOffset)("is file open when dumped",
ToString(checkpoint->mFileOpenFlag))(
"is container stopped when dumped", ToString(checkpoint->mContainerStopped)));
eventVec.push_back(
new Event(path, fileName, EVENT_MODIFY, wd, 0, checkpoint->mDevInode.dev, checkpoint->mDevInode.inode));
eventVec[eventVec.size() - 1]->SetConfigName(checkpoint->mConfigName);
return ValidateCheckpointResult::kNormal;
}
// File path not exist or dev inode mismatch, do search.
// Can not do search for checkpoint with signature size == 0.
// See https://aone.alibaba-inc.com/req/29052357.
if (0 == checkpoint->mSignatureSize) {
LOG_INFO(sLogger,
("delete checkpoint", "file signature size is zero")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kZeroSigSize;
}
// Try to find the real file with dev inode, check cache at first.
map<DevInode, SplitedFilePath>::iterator findIter = cachePathDevInodeMap.find(checkpoint->mDevInode);
if (findIter != cachePathDevInodeMap.end()) {
if (findIter->second.mFileDir != path) {
LOG_INFO(sLogger,
("delete checkpoint", "file has been moved to other dir")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("original real file path", realFilePath)(
"new real file path", PathJoin(findIter->second.mFileDir, findIter->second.mFileName))(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kLogDirChanged;
}
if (CheckFileSignature(
PathJoin(path, findIter->second.mFileName), checkpoint->mSignatureHash, checkpoint->mSignatureSize)) {
checkpoint->mRealFileName = PathJoin(findIter->second.mFileDir, findIter->second.mFileName);
LOG_INFO(sLogger,
("generate MODIFY event for file with checkpoint",
"file has been renamed, but still in the same dir")("config", checkpoint->mConfigName)(
"log reader queue name", filePath)("original real file path", realFilePath)(
"new real file path", checkpoint->mRealFileName)("file device", checkpoint->mDevInode.dev)(
"file inode", checkpoint->mDevInode.inode)("signature", checkpoint->mSignatureHash)(
"last file position", checkpoint->mOffset)("is file open when dumped",
ToString(checkpoint->mFileOpenFlag))(
"is container stopped when dumped", ToString(checkpoint->mContainerStopped)));
eventVec.push_back(
new Event(path, fileName, EVENT_MODIFY, wd, 0, checkpoint->mDevInode.dev, checkpoint->mDevInode.inode));
eventVec[eventVec.size() - 1]->SetConfigName(checkpoint->mConfigName);
return ValidateCheckpointResult::kRotate;
}
if ((inputFile && 0 == inputFile->mExactlyOnceConcurrency) || inputContainerStdio) {
LOG_INFO(sLogger,
("ignore check point, file signature has changed", filePath)("old real path", realFilePath)(
findIter->second.mFileDir, findIter->second.mFileName)("inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kSigChanged;
}
LOG_INFO(sLogger,
("exactly once checkpoint", "delete cache and retry search")("old real path", realFilePath)(
findIter->second.mFileDir, findIter->second.mFileName)("inode", checkpoint->mDevInode.inode));
cachePathDevInodeMap.erase(findIter);
}
if (cachePathDevInodeMap.size() >= (size_t)INT32_FLAG(checkpoint_find_max_cache_size)) {
LOG_WARNING(
sLogger,
("delete checkpoint", "cannot find the file because of full find cache")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
AlarmManager::GetInstance()->SendAlarm(
CHECKPOINT_ALARM,
string("cannot find the file because of full find cache, delete the checkpoint, log reader queue name: ")
+ filePath + ", real file path: " + realFilePath,
config->GetContext().GetRegion(),
config->GetContext().GetProjectName(),
config->GetContext().GetConfigName(),
config->GetContext().GetLogstoreName());
return ValidateCheckpointResult::kCacheFull;
}
uint16_t searchDepth = 0;
if (inputFile) {
searchDepth = inputFile->mMaxCheckpointDirSearchDepth;
}
auto const searchResult
= SearchFilePathByDevInodeInDirectory(path, searchDepth, checkpoint->mDevInode, &cachePathDevInodeMap);
if (searchResult) {
const auto& newRealPath = searchResult.value();
if (CheckFileSignature(newRealPath, checkpoint->mSignatureHash, checkpoint->mSignatureSize)) {
checkpoint->mRealFileName = newRealPath;
LOG_INFO(sLogger,
("generate MODIFY event for file with checkpoint",
"file has been renamed, but still in the same dir")("config", checkpoint->mConfigName)(
"log reader queue name", filePath)("original real file path", realFilePath)(
"new real file path", checkpoint->mRealFileName)("file device", checkpoint->mDevInode.dev)(
"file inode", checkpoint->mDevInode.inode)("signature", checkpoint->mSignatureHash)(
"last file position", checkpoint->mOffset)("is file open when dumped",
ToString(checkpoint->mFileOpenFlag))(
"is container stopped when dumped", ToString(checkpoint->mContainerStopped)));
eventVec.push_back(
new Event(path, fileName, EVENT_MODIFY, wd, 0, checkpoint->mDevInode.dev, checkpoint->mDevInode.inode));
eventVec[eventVec.size() - 1]->SetConfigName(checkpoint->mConfigName);
return ValidateCheckpointResult::kRotate;
}
LOG_INFO(sLogger,
("delete checkpoint", "file has been renamed but signature has changed")(
"config", checkpoint->mConfigName)("log reader queue name", checkpoint->mFileName)(
"original real file path", realFilePath)("new real file path", newRealPath)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kSigChanged;
}
// Can not find dev inode, delete this checkpoint.
LOG_INFO(sLogger,
("delete checkpoint", "cannot find the file any more")("config", checkpoint->mConfigName)(
"log reader queue name", checkpoint->mFileName)("real file path", checkpoint->mRealFileName)(
"file device", checkpoint->mDevInode.inode)("file inode", checkpoint->mDevInode.inode));
return ValidateCheckpointResult::kDevInodeNotFound;
}
void EventDispatcher::AddExistedCheckPointFileEvents() {
// All checkpoint will be add into event queue or be deleted
// This operation will delete not existed file's check point
map<DevInode, SplitedFilePath> cachePathDevInodeMap;
auto& checkPointMap = CheckPointManager::Instance()->GetAllFileCheckPoint();
LOG_INFO(sLogger, ("start to verify existed checkpoints, total checkpoint count", checkPointMap.size()));
vector<CheckPointManager::CheckPointKey> deleteKeyVec;
vector<Event*> eventVec;
for (auto iter = checkPointMap.begin(); iter != checkPointMap.end(); ++iter) {
auto const result = validateCheckpoint(iter->second, cachePathDevInodeMap, eventVec);
if (!(result == ValidateCheckpointResult::kNormal || result == ValidateCheckpointResult::kRotate)) {
deleteKeyVec.push_back(iter->first);
}
}
for (size_t i = 0; i < deleteKeyVec.size(); ++i) {
checkPointMap.erase(deleteKeyVec[i]);
}
LOG_INFO(sLogger,
("checkpoint verification ends, generated event count", eventVec.size())("checkpoint deletion count",
deleteKeyVec.size()));
auto const v1EventCount = eventVec.size();
// Load exactly once checkpoints and create events from them.
// Because they are not in v1 checkpoint manager, no need to delete them.
auto exactlyOnceConfigs = FileServer::GetInstance()->GetExactlyOnceConfigs();
if (!exactlyOnceConfigs.empty()) {
static auto* sCptMV2 = CheckpointManagerV2::GetInstance();
auto exactlyOnceCpts = sCptMV2->ScanCheckpoints(exactlyOnceConfigs);
LOG_INFO(sLogger,
("start add exactly once checkpoint events",
"")("config size", exactlyOnceConfigs.size())("scanned checkpoint size", exactlyOnceCpts.size()));
vector<pair<string, PrimaryCheckpointPB>*> batchUpdateCpts;
vector<pair<string, PrimaryCheckpointPB>*> batchDeleteCpts;
for (size_t idx = 0; idx < exactlyOnceCpts.size(); ++idx) {
auto& cptPair = exactlyOnceCpts[idx];
auto& cpt = cptPair.second;
auto v1Cpt = make_shared<CheckPoint>(cpt.log_path(),
0,
cpt.sig_size(),
cpt.sig_hash(),
DevInode(cpt.dev(), cpt.inode()),
cpt.config_name(),
cpt.real_path(),
1,
0,
"",
0);
const auto result = validateCheckpoint(v1Cpt, cachePathDevInodeMap, eventVec);
switch (result) {
case ValidateCheckpointResult::kNormal:
break;
case ValidateCheckpointResult::kRotate:
LOG_INFO(sLogger,
("exactly once file rotate",
cptPair.first)("old checkpoint", cpt.DebugString())("new path", v1Cpt->mRealFileName));
cpt.set_real_path(v1Cpt->mRealFileName);
batchUpdateCpts.push_back(&cptPair);
break;
default:
batchDeleteCpts.push_back(&cptPair);
LOG_WARNING(sLogger,
("delete invalid exactly once checkpoint",
static_cast<int>(result))("key", cptPair.first)("checkpoint", cpt.DebugString()));
break;
}
}
uint64_t updateUsedTimeInMs = 0;
if (!batchUpdateCpts.empty()) {
updateUsedTimeInMs = sCptMV2->UpdatePrimaryCheckpoints(batchUpdateCpts);
}
uint64_t deleteUsedTimeInMs = 0;
if (!batchDeleteCpts.empty()) {
deleteUsedTimeInMs = sCptMV2->DeletePrimaryCheckpoints(batchDeleteCpts);
}
LOG_INFO(sLogger,
("finish add exactly once checkpoint events",
"")("cache size", cachePathDevInodeMap.size())("update size", batchUpdateCpts.size())(
"update used time", updateUsedTimeInMs)("event size", eventVec.size() - v1EventCount)(
"delete size", batchDeleteCpts.size())("delete used time", deleteUsedTimeInMs));
}
for (size_t i = 0; i < eventVec.size(); ++i) {
LOG_DEBUG(sLogger,
("event from checkpoint", i)("file", eventVec[i]->GetObject())("inode", eventVec[i]->GetInode())(
"config", eventVec[i]->GetConfigName()));
}
if (eventVec.size() > 0) {
// Sort by Source/Object (length+alphabet) in event to adjust the order of rotating files.
// eg. /log/a.log.10 -> /log/a.log.9 -> /log/a.log.8 -> ...
sort(eventVec.begin(), eventVec.end(), Event::CompareByFullPath);
LogInput::GetInstance()->PushEventQueue(eventVec);
}
}
bool EventDispatcher::AddTimeoutWatch(const string& path) {
MapType<string, int>::Type::iterator itr = mPathWdMap.find(path);
if (itr != mPathWdMap.end()) {
mWdUpdateTimeMap[itr->second] = time(NULL);
return true;
}
return false;
}
void EventDispatcher::AddOneToOneMapEntry(DirInfo* dirInfo, int wd) {
mPathWdMap[dirInfo->mPath] = wd;
mWdDirInfoMap[wd] = dirInfo;
}
void EventDispatcher::RemoveOneToOneMapEntry(int wd) {
MapType<int, DirInfo*>::Type::iterator itr = mWdDirInfoMap.find(wd);
if (itr == mWdDirInfoMap.end())
return;
mPathWdMap.erase((itr->second)->mPath);
delete itr->second;
mWdDirInfoMap.erase(itr);
}
void EventDispatcher::DumpInotifyWatcherDirs() {
string filename = GetInotifyWatcherDirsDumpFileName();
FILE* pFile = fopen(filename.c_str(), "w");
if (pFile == NULL) {
LOG_WARNING(sLogger, ("open file (dump inotify watcher dirs) failed", filename)("errno", errno));
return;
}
string outline = string("WatchNum: ") + ToString(mWatchNum) + ", NotifyNum: " + ToString(mInotifyWatchNum)
+ ", WdUpdateTimeMap: " + ToString(mWdUpdateTimeMap.size()) + ", PathWdMap: " + ToString(mPathWdMap.size())
+ ", WdDirInfoMap: " + ToString(mWdDirInfoMap.size()) + ", BrokenLinkSet: " + ToString(mBrokenLinkSet.size())
+ "\n";
fwrite(outline.c_str(), 1, outline.size(), pFile);
string info = "directory\twatch_descriptor\n";
for (MapType<string, int>::Type::iterator iter = mPathWdMap.begin(); iter != mPathWdMap.end(); ++iter) {
if (iter->second > 0)
info.append(iter->first).append("\t").append(ToString(iter->second)).append("\n");
}
fwrite(info.c_str(), 1, info.size(), pFile);
fclose(pFile);
}
void EventDispatcher::CheckSymbolicLink() {
// consider symbolic link like this: a -> b -> c
// "a" and "b" is symbolic link, "c" is a directory, "a" is registered in inotify
vector<string> dirToCheck(mBrokenLinkSet.begin(), mBrokenLinkSet.end());
for (MapType<string, int>::Type::iterator iter = mPathWdMap.begin(); iter != mPathWdMap.end(); ++iter) {
string path = iter->first;
int wd = iter->second;
if (mWdDirInfoMap.find(wd) == mWdDirInfoMap.end())
LOG_WARNING(sLogger,
("maybe something wrong, path in mPathWdMap", path)("but wd not exist in mWdDirInfoMap", wd));
else if (mWdDirInfoMap[wd]->mIsSymbolicLink)
dirToCheck.push_back(path);
}
vector<string> dirToAdd;
for (vector<string>::iterator dirIter = dirToCheck.begin(); dirIter != dirToCheck.end(); ++dirIter) {
string path = *dirIter;
fsutil::PathStat lstatBuf;
if (!fsutil::PathStat::lstat(path, lstatBuf)) // check path itself
{
// when "a" was removed, there will be no inotify event
LOG_WARNING(sLogger, ("path not exist, remove inotify monitor", path));
if (mBrokenLinkSet.find(path) != mBrokenLinkSet.end()) {
mBrokenLinkSet.erase(path);
}
UnregisterAllDir(path);
} else {
fsutil::PathStat statBuf;
if (!fsutil::PathStat::stat(path, statBuf)) // check path refrence to
{
// when "b" was removed, there will be no inotify event
LOG_WARNING(sLogger, ("existed symbolic link invalid, remove inotify monitor", path));
UnregisterAllDir(path);
mBrokenLinkSet.insert(path);
} else if (statBuf.IsDir()) {
// when "c" or "b" was removed, a will be put into broken link set
// then the directory "c" or "b" be created with no IN_CRETATE event, should add inotify monitor for "a"
// again
if (mBrokenLinkSet.find(path) != mBrokenLinkSet.end()) {
mBrokenLinkSet.erase(path);
}
dirToAdd.push_back(path);
}
}
}
for (vector<string>::iterator iter = dirToAdd.begin(); iter != dirToAdd.end(); ++iter) {
FileDiscoveryConfig config = ConfigManager::GetInstance()->FindBestMatch(*iter);
if (config.first && IsDirRegistered(*iter) == PATH_INODE_NOT_REGISTERED)
ConfigManager::GetInstance()->RegisterHandlersRecursively(*iter, config, true);
}
}
void EventDispatcher::ReadInotifyEvents(vector<Event*>& eventVec) {
mEventListener->ReadEvents(eventVec);
}
vector<pair<string, EventHandler*>> EventDispatcher::FindAllSubDirAndHandler(const string& baseDir) {
LOG_DEBUG(sLogger, ("Find all sub dir", baseDir));
vector<pair<string, EventHandler*>> dirAndHandlers;
size_t baseDirSize = baseDir.size();
MapType<string, int>::Type::iterator it = mPathWdMap.begin();
for (; it != mPathWdMap.end(); ++it) {
const string& pathName = it->first;
size_t pathNameSize = pathName.size();
if (baseDirSize > pathNameSize) {
continue;
}
if (memcmp(baseDir.c_str(), pathName.c_str(), baseDirSize) == 0
&& (pathNameSize == baseDirSize || pathName[baseDirSize] == PATH_SEPARATOR[0])) {
dirAndHandlers.push_back(make_pair(it->first, mWdDirInfoMap[it->second]->mHandler));
}
}
return dirAndHandlers;
}
void EventDispatcher::UnregisterAllDir(const string& baseDir) {
LOG_DEBUG(sLogger, ("Remove all sub dir", baseDir));
auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir);
for (auto& subDirAndHandler : subDirAndHandlers) {
mTimeoutHandler->Handle(Event(subDirAndHandler.first, "", 0, 0));
}
}
void EventDispatcher::UnregisterEventHandler(const string& path) {
MapType<string, int>::Type::iterator pos = mPathWdMap.find(path);
if (pos == mPathWdMap.end())
return;
int wd = pos->second;
if (mWdDirInfoMap[wd]->mIsSymbolicLink) {
fsutil::PathStat lstatBuf;
if (fsutil::PathStat::lstat(path, lstatBuf)) // TODO: Need review, might be a bug.
{
// path(symbolic link) existed, the dir it refrence to is deleted
mBrokenLinkSet.insert(path);
}
}
RemoveOneToOneMapEntry(wd);
mWdUpdateTimeMap.erase(wd);
if (EventListener::IsValidID(wd) && mEventListener->IsInit()) {
mEventListener->RemoveWatch(wd);
mInotifyWatchNum--;
}
mWatchNum--;
LOG_INFO(sLogger, ("remove the watcher for dir", path)("wd", wd));
}
void EventDispatcher::StopAllDir(const string& baseDir, const string& containerID) {
LOG_DEBUG(sLogger, ("Stop all sub dir", baseDir));
auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir);
for (auto& subDirAndHandler : subDirAndHandlers) {
Event e(subDirAndHandler.first, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0);
e.SetContainerID(containerID);
subDirAndHandler.second->Handle(e);
}
}
DirRegisterStatus EventDispatcher::IsDirRegistered(const string& path) {
fsutil::PathStat statBuf;
// if symbolic link, return the referred inode
if (!fsutil::PathStat::stat(path, statBuf)) {
LOG_WARNING(sLogger, ("call stat() on path fail", path)("errno", GetErrno()));
return GET_REGISTER_STATUS_ERROR;
}
uint64_t inode = statBuf.GetDevInode().inode;
MapType<string, int>::Type::iterator pathIter = mPathWdMap.find(path);
if (pathIter != mPathWdMap.end() && inode == mWdDirInfoMap[pathIter->second]->mInode) {
return PATH_INODE_REGISTERED;
}
return PATH_INODE_NOT_REGISTERED;
}
bool EventDispatcher::IsRegistered(const std::string& path) {
MapType<string, int>::Type::iterator itr = mPathWdMap.find(path);
if (itr == mPathWdMap.end())
return false;
return true;
}
bool EventDispatcher::IsRegistered(int wd, string& path) {
MapType<int, DirInfo*>::Type::iterator itr = mWdDirInfoMap.find(wd);
if (itr == mWdDirInfoMap.end())
return false;
else {
path = (itr->second)->mPath;
return true;
}
}
void EventDispatcher::HandleTimeout() {
// increment each watcher's timeout account, if bound meets,
// call timeout handler
vector<string*> sources;
vector<EventHandler*> handlers;
time_t curTime = time(NULL);
MapType<int, time_t>::Type::iterator itr = mWdUpdateTimeMap.begin();
for (; itr != mWdUpdateTimeMap.end(); ++itr) {
if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) {
// add to vector then batch process to avoid possible iterator change problem
// mHandler may remove what itr points to, thus change the layout of the map container
// what follows may not work
// Event ev(source, string(), EVENT_TIMEOUT);
// mTimoutHandler->Handle(ev);
sources.push_back(&(mWdDirInfoMap[itr->first]->mPath));
}
}
// when we reach this function, for any dir p and its
// descendant dir c, we have p is at least as newer as c
// but in vector sources, p may appear before c, this is
// not a problem if what we do is irrelevant to the order whose
// timeout handler is called when both are timeout at the same time.
vector<string*>::iterator itr1 = sources.begin();
for (; itr1 != sources.end(); ++itr1) {
Event ev(*(*itr1), string(), EVENT_TIMEOUT, 0);
mTimeoutHandler->Handle(ev);
}
}
void EventDispatcher::PropagateTimeout(const std::string& path) {
auto pathpos = mPathWdMap.find(path);
if (pathpos == mPathWdMap.end()) {
// walkarond of bug#5760293, should find the scenarios
AlarmManager::GetInstance()->SendAlarm(INVALID_MEMORY_ACCESS_ALARM,
"PropagateTimeout access invalid key of mPathWdMap, path : " + path);
LOG_ERROR(sLogger, ("PropagateTimeout access invalid key of mPathWdMap, path", path));
return;
}
string tmp(path);
auto pos = mWdUpdateTimeMap.find(pathpos->second);
time_t curTime = time(nullptr);
while (pos != mWdUpdateTimeMap.end()) {
pos->second = curTime;
auto slashpos = tmp.rfind('/');
if (slashpos == string::npos)
break;
tmp.resize(slashpos);
pathpos = mPathWdMap.find(tmp);
if (pathpos != mPathWdMap.end())
pos = mWdUpdateTimeMap.find(pathpos->second);
else
break;
}
}
void EventDispatcher::StartTimeCount() {
MapType<int, time_t>::Type::iterator itr = mWdUpdateTimeMap.begin();
time_t cur = time(NULL);
for (; itr != mWdUpdateTimeMap.end(); ++itr) {
itr->second = cur;
}
}
void EventDispatcher::DumpAllHandlersMeta(bool remove) {
MapType<int, DirInfo*>::Type::iterator it;
vector<int> timeout;
for (it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) {
((it->second)->mHandler)->DumpReaderMeta(true, remove);
}
for (it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) {
((it->second)->mHandler)->DumpReaderMeta(false, remove);
timeout.push_back(it->first);
if (remove)
ConfigManager::GetInstance()->AddHandlerToDelete((it->second)->mHandler);
}
for (size_t i = 0; i < timeout.size(); ++i) {
int wd = timeout[i];
string path = mWdDirInfoMap[wd]->mPath;
if (remove) {
UnregisterEventHandler(path);
ConfigManager::GetInstance()->RemoveHandler(path, false);
if (ConfigManager::GetInstance()->FindBestMatch(path).first == NULL) {
continue;
}
}
CheckPointManager::Instance()->AddDirCheckPoint(path);
}
LOG_INFO(sLogger, ("save log reader status", "succeeded"));
}
void EventDispatcher::ProcessHandlerTimeOut() {
MapType<int, DirInfo*>::Type::iterator mapIter = mWdDirInfoMap.begin();
for (; mapIter != mWdDirInfoMap.end(); ++mapIter) {
mapIter->second->mHandler->HandleTimeOut();
}
}
void EventDispatcher::DumpCheckPointPeriod(int32_t curTime) {
if (CheckPointManager::Instance()->NeedDump(curTime)) {
DumpCheckPoint();
}
}
void EventDispatcher::DumpCheckPoint() {
LOG_INFO(sLogger, ("checkpoint dump", "starts"));
FileServer::GetInstance()->Pause(false);
DumpAllHandlersMeta(false);
if (!(CheckPointManager::Instance()->DumpCheckPointToLocal()))
LOG_WARNING(sLogger, ("dump checkpoint to local", "failed"));
else
LOG_DEBUG(sLogger, ("dump checkpoint to local", "succeeded"));
// after save checkpoint, we should clear all checkpoint
CheckPointManager::Instance()->RemoveAllCheckPoint();
FileServer::GetInstance()->Resume(false);
LOG_INFO(sLogger, ("checkpoint dump", "succeeded"));
}
bool EventDispatcher::IsAllFileRead() {
for (auto it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) {
if (!((it->second)->mHandler)->IsAllFileRead()) {
return false;
}
}
return true;
}
#ifdef APSARA_UNIT_TEST_MAIN
void EventDispatcher::CleanEnviroments() {
// mMainThreadRunning = false;
sleep(2); // INT32_FLAG(ilogtail_epoll_time_out) + 1
mPathWdMap.clear();
for (MapType<int, DirInfo*>::Type::iterator iter = mWdDirInfoMap.begin(); iter != mWdDirInfoMap.end(); ++iter)
delete iter->second;
mWdDirInfoMap.clear();
mBrokenLinkSet.clear();
mWdUpdateTimeMap.clear();
// for (unordered_map<int64_t, SingleDSPacket*>::iterator iter = mPacketBuffer.begin();
// iter != mPacketBuffer.end();
// ++iter)
// delete iter->second;
// mPacketBuffer.clear();
mEventListener->Destroy();
mEventListener->Init();
PollingDirFile::GetInstance()->Stop();
PollingModify::GetInstance()->Stop();
PollingEventQueue::GetInstance()->Clear();
CheckPointManager::Instance()->RemoveAllCheckPoint();
}
int32_t EventDispatcher::GetInotifyWatcherCount() {
int32_t inotifyWatcherCount = 0;
for (MapType<int, DirInfo*>::Type::iterator iter = mWdDirInfoMap.begin(); iter != mWdDirInfoMap.end(); ++iter) {
if (iter->first >= 0)
++inotifyWatcherCount;
}
return inotifyWatcherCount;
}
#endif
} // namespace logtail