core/file_server/event_handler/EventHandler.cpp (933 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "EventHandler.h"
#include <iostream>
#include <string>
#include <vector>
#include "app_config/AppConfig.h"
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "common/FileSystemUtil.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"
#include "runner/ProcessorRunner.h"
using namespace std;
using namespace sls_logs;
DEFINE_FLAG_INT64(read_file_time_slice, "microseconds", 25 * 1000);
DEFINE_FLAG_INT32(logreader_timeout_interval,
"reader hasn't updated for a long time will be removed, seconds",
86400 * 20000); // roughly equivalent to not releasing logReader when timed out
DEFINE_FLAG_INT32(cookie_timeout_interval,
"rotate cookie hasn't updated for a long time will be removed, seconds",
1800);
DEFINE_FLAG_INT32(logreader_count_upperlimit,
"when reader (in one dir) count exceed limit, reader hasn't updated for 7 * 24 hours will be removed",
5000);
DEFINE_FLAG_INT32(logreader_count_maxlimit,
"when reader (in one dir) count exceed limit, reader hasn't updated for 24 hours will be removed",
10000);
DEFINE_FLAG_INT32(logreader_count_max,
"when reader (in one dir) count exceed limit, modify handler will remove old reader ",
100000);
DEFINE_FLAG_INT32(logreader_count_max_remove_count, "remove reader count ", 10000);
DEFINE_FLAG_INT32(logreader_filedeleted_remove_interval,
"when file is deleted, reader will be removed after seconds",
900);
DEFINE_FLAG_INT32(logreader_filerotate_remove_interval,
"when file is rotate, reader will be removed after seconds",
600);
DEFINE_FLAG_INT32(rotate_overflow_error_interval, "second", 60);
namespace logtail {
void NormalEventHandler::Handle(const Event& event) {
bool fileCreateModify = false;
if (event.IsCreate() || event.IsMoveTo()) {
if (event.IsDir()) {
mCreateHandlerPtr->Handle(event);
} else {
string fullPath = PathJoin(event.GetSource(), event.GetObject());
fsutil::PathStat buf;
if (!fsutil::PathStat::stat(fullPath, buf)) {
// filename before rollback is not exist this moment
LOG_DEBUG(sLogger, ("get path info error", fullPath));
} else if (buf.IsDir()) {
// register consider timeout
mCreateHandlerPtr->Handle(event);
} else if (!buf.IsRegFile()) {
LOG_INFO(sLogger, ("path is not file or directory, ignore it", fullPath)("stat mode", buf.GetMode()));
AlarmManager::GetInstance()->SendAlarm(UNEXPECTED_FILE_TYPE_MODE_ALARM,
string("found unexpected type mode: ") + ToString(buf.GetMode())
+ ", file path: " + fullPath);
return;
} else if (event.IsCreate())
fileCreateModify = true;
}
} else if (event.IsModify())
fileCreateModify = true;
if (fileCreateModify) {
// register a new handler for source
// match config, then register
auto const& path = event.GetSource();
// warning: following case should occur:
// a/b & a/b/c both in configuration, but requires
// its descendants being watched timeout
// while the other not
const string& name = event.GetObject();
// the file suffix which will be ignored
if (!IsValidSuffix(name))
return;
FileDiscoveryConfig config = make_pair(nullptr, nullptr);
if (event.GetConfigName().empty()) {
config = ConfigManager::GetInstance()->FindBestMatch(path, name);
} else {
config = FileServer::GetInstance()->GetFileDiscoveryConfig(event.GetConfigName());
}
if (config.first) {
LOG_DEBUG(sLogger,
("register dir", PathJoin(path, name))("logstore", config.second->GetLogstoreName())(
"max depth", config.first->mMaxDirSearchDepth));
EventHandler* newHandler = new CreateModifyHandler(mCreateHandlerPtr);
EventHandler* handler = newHandler;
if (EventDispatcher::GetInstance()->RegisterEventHandler(path, config, handler)) {
if (handler != newHandler)
delete newHandler;
else
ConfigManager::GetInstance()->AddNewHandler(path, handler);
handler->Handle(event);
} else {
delete handler;
}
// from now on, it's ret who is responsible for this directory
// And when it's timeout configmanager will delete it
}
}
}
void NormalEventHandler::HandleTimeOut() {
mCreateHandlerPtr->HandleTimeOut(); // empty call
}
// CreateHandler implementation
// xxx: event.IsDir() true!
void CreateHandler::Handle(const Event& event) {
// warning: following case should occur:
// a/b & a/b/c both in configuration, but requires
// its descendants being watched timeout
// while the other not
string object = event.GetObject();
string path = event.GetSource();
if (object.size() > 0)
path += PATH_SEPARATOR + object;
if (!object.empty() && '.' == object[0]) {
LOG_INFO(sLogger, ("ignore hidden file or directory", path));
return;
}
FileDiscoveryConfig config = ConfigManager::GetInstance()->FindBestMatch(path);
if (!config.first)
return;
else if (event.IsDir())
ConfigManager::GetInstance()->RegisterHandlers(path, config);
else {
// symbolic link
if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) {
// TODO: why not use RegisterHandlers
ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, true);
}
}
}
void CreateHandler::HandleTimeOut() {
// do nothing
}
// TimeoutHandler implementation
void TimeoutHandler::Handle(const Event& ev) {
const string& dir = ev.GetSource();
EventDispatcher::GetInstance()->UnregisterEventHandler(dir);
ConfigManager::GetInstance()->RemoveHandler(dir);
CheckPointManager::Instance()->DeleteDirCheckPoint(dir);
}
void TimeoutHandler::HandleTimeOut() {
// do nothing
}
bool CreateModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
for (ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.begin(); iter != mModifyHandlerPtrMap.end(); ++iter) {
iter->second->DumpReaderMeta(isRotatorReader, checkConfigFlag);
}
return true;
}
bool CreateModifyHandler::IsAllFileRead() {
for (ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.begin(); iter != mModifyHandlerPtrMap.end(); ++iter) {
if (!iter->second->IsAllFileRead()) {
return false;
}
}
return true;
}
ModifyHandler* CreateModifyHandler::GetOrCreateModifyHandler(const std::string& configName,
const FileDiscoveryConfig& pConfig) {
ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.find(configName);
if (iter != mModifyHandlerPtrMap.end()) {
return iter->second;
}
ModifyHandler* pHanlder = new ModifyHandler(configName, pConfig);
mModifyHandlerPtrMap.insert(std::make_pair(configName, pHanlder));
return pHanlder;
}
CreateModifyHandler::~CreateModifyHandler() {
for (ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.begin(); iter != mModifyHandlerPtrMap.end(); ++iter) {
delete iter->second;
}
mModifyHandlerPtrMap.clear();
}
// CreateModifyHandler implementation
void CreateModifyHandler::Handle(const Event& event) {
bool isDir = false;
auto path = std::string(event.GetSource()).append(PATH_SEPARATOR).append(event.GetObject());
if (event.IsDir()) {
isDir = true;
} else if (event.IsCreate() || event.IsMoveTo()) {
fsutil::PathStat buf;
if (!fsutil::PathStat::stat(path, buf))
LOG_DEBUG(sLogger, ("get path info error", path));
else if (buf.IsDir())
isDir = true;
else if (!buf.IsRegFile()) {
LOG_INFO(sLogger, ("path is not file or directory, ignore it", path)("stat mode", buf.GetMode()));
AlarmManager::GetInstance()->SendAlarm(UNEXPECTED_FILE_TYPE_MODE_ALARM,
std::string("found unexpected type mode: ") + ToString(buf.GetMode())
+ ", file path: " + path);
return;
}
}
if ((event.IsCreate() || event.IsMoveTo()) && isDir) {
mCreateHandlerPtr->Handle(event);
} else if (event.IsContainerStopped() && isDir) {
for (auto& pair : mModifyHandlerPtrMap) {
LOG_DEBUG(sLogger,
("Handle container stopped event, config", pair.first)("Source", event.GetSource())(
"Object", event.GetObject())("Dev", event.GetDev())("Inode", event.GetInode()));
pair.second->Handle(event);
}
} else if (event.IsCreate() || event.IsModify() || event.IsMoveFrom() || event.IsMoveTo() || event.IsDeleted()) {
if (!event.GetConfigName().empty()) {
FileDiscoveryConfig pConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(event.GetConfigName());
if (pConfig.first) {
LOG_DEBUG(sLogger,
("Process event with existed config", event.GetConfigName())("Source", event.GetSource())(
"Object", event.GetObject())("Dev", event.GetDev())("Inode", event.GetInode()));
GetOrCreateModifyHandler(pConfig.second->GetConfigName(), pConfig)->Handle(event);
} else {
// if event is delete
LOG_WARNING(sLogger, ("can not find config, config may be deleted", event.GetConfigName()));
}
} else {
vector<FileDiscoveryConfig> pConfigVec;
if (AppConfig::GetInstance()->IsAcceptMultiConfig()) {
ConfigManager::GetInstance()->FindAllMatch(pConfigVec, event.GetSource(), event.GetObject());
} else {
ConfigManager::GetInstance()->FindMatchWithForceFlag(pConfigVec, event.GetSource(), event.GetObject());
}
for (auto configIter = pConfigVec.begin(); configIter != pConfigVec.end(); ++configIter) {
LOG_DEBUG(sLogger,
("Process event with multi config", pConfigVec.size())(event.GetSource(), event.GetObject()));
GetOrCreateModifyHandler(configIter->second->GetConfigName(), *configIter)->Handle(event);
}
}
}
}
void CreateModifyHandler::HandleTimeOut() {
for (ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.begin(); iter != mModifyHandlerPtrMap.end(); ++iter) {
iter->second->HandleTimeOut();
}
mCreateHandlerPtr->HandleTimeOut(); // empty call
}
// implementation for ModifyHandler
ModifyHandler::ModifyHandler(const std::string& configName, const FileDiscoveryConfig& pConfig)
: mConfigName(configName) {
// default is 2 * INT64_FLAG(read_file_time_slice)
mReadFileTimeSlice = 1 << (ProcessQueueManager::sMaxPriority - pConfig.second->GetGlobalConfig().mPriority)
* INT64_FLAG(read_file_time_slice);
mLastOverflowErrorTime = 0;
}
ModifyHandler::~ModifyHandler() {
}
void ModifyHandler::MakeSpaceForNewReader() {
if (mDevInodeReaderMap.size()
< (size_t)INT32_FLAG(logreader_count_max) + (size_t)INT32_FLAG(logreader_count_max_remove_count)) {
return;
}
vector<LogFileReader*> sortReaderArray;
sortReaderArray.resize(mDevInodeReaderMap.size());
size_t index = 0;
for (DevInodeLogFileReaderMap::iterator iter = mDevInodeReaderMap.begin(); iter != mDevInodeReaderMap.end();
++iter) {
sortReaderArray[index++] = iter->second.get();
}
// little to big
sort(sortReaderArray.begin(), sortReaderArray.end(), ModifyHandler::CompareReaderByUpdateTime);
const int32_t deleteCount = (int32_t)mDevInodeReaderMap.size() - INT32_FLAG(logreader_count_max);
for (int i = 0; i < deleteCount; ++i) {
LogFileReader* pReader = sortReaderArray[i];
mDevInodeReaderMap.erase(pReader->GetDevInode());
LogFileReaderPtrArray& readerArray = *pReader->GetReaderArray();
for (LogFileReaderPtrArray::iterator iter = readerArray.begin(); iter != readerArray.end(); ++iter) {
if (iter->get() == pReader) {
readerArray.erase(iter);
break;
}
}
}
LOG_WARNING(sLogger,
("remove some of the old readers from the log rotator queue",
"total log reader count exceeds upper limit")("reader count after clean", mDevInodeReaderMap.size()));
// randomly choose one project to send alarm
LogFileReaderPtr oneReader = mDevInodeReaderMap.begin()->second;
AlarmManager::GetInstance()->SendAlarm(
FILE_READER_EXCEED_ALARM,
string("total log reader count exceeds upper limit, delete some of the old readers, reader count after clean:")
+ ToString(mDevInodeReaderMap.size()),
oneReader->GetRegion(),
oneReader->GetProject(),
oneReader->GetConfigName(),
oneReader->GetLogstore());
}
LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
const string& name,
const DevInode& devInode,
const FileReaderConfig& readerConfig,
const MultilineConfig& multilineConfig,
const FileDiscoveryConfig& discoveryConfig,
const FileTagConfig& tagConfig,
uint32_t exactlyonceConcurrency,
bool forceBeginingFlag) {
if (mNameReaderMap.find(name) == mNameReaderMap.end()) {
LOG_INFO(sLogger,
("create new log reader queue, project",
readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName())(
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"new log reader queue count", mNameReaderMap.size() + 1));
}
LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
tagConfig,
exactlyonceConcurrency,
forceBeginingFlag));
if (readerPtr.get() == NULL)
return LogFileReaderPtr();
if (readerArray.size() >= readerConfig.first->mRotatorQueueSize
&& readerPtr->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
int32_t nowTime = time(NULL);
if (nowTime - mLastOverflowErrorTime > INT32_FLAG(rotate_overflow_error_interval)) {
mLastOverflowErrorTime = nowTime;
LOG_ERROR(
sLogger,
("stop creating new reader",
"log reader queue length excceeds upper limit")("project", readerConfig.second->GetProjectName())(
"logstore", readerConfig.second->GetLogstoreName())("config", readerConfig.second->GetConfigName())(
"log reader queue name", PathJoin(path, name))("max queue length",
readerConfig.first->mRotatorQueueSize));
AlarmManager::GetInstance()->SendAlarm(
DROP_LOG_ALARM,
string("log reader queue length excceeds upper limit, stop creating new reader, config: ")
+ readerConfig.second->GetConfigName() + ", log reader queue name: " + PathJoin(path, name)
+ ", max queue length: " + to_string(readerConfig.first->mRotatorQueueSize),
readerConfig.second->GetRegion(),
readerConfig.second->GetProjectName(),
readerConfig.second->GetConfigName(),
readerConfig.second->GetLogstoreName());
}
return LogFileReaderPtr();
}
LOG_INFO(sLogger,
("start to create log reader, project",
readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName())(
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));
// new log
bool backFlag = false;
if (readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath()) {
backFlag = true;
// if reader is a new file(not from checkpoint), and file is rotate file, reset file pos
if (readerArray.size() > 0 && !readerPtr->IsFromCheckPoint()) {
LOG_DEBUG(sLogger, ("file rotate, reset new reader pos", PathJoin(path, name)));
readerPtr->ResetLastFilePos();
}
} else {
backFlag = false;
// rotate log, push front
LOG_DEBUG(sLogger, ("rotator log, push front", readerPtr->GetRealLogPath()));
}
// need check skip flag first and if flag is false then open fd
if (!readerPtr->NeedSkipFirstModify()) {
if (!readerPtr->UpdateFilePtr()) {
if (errno != EMFILE) {
LOG_ERROR(
sLogger,
("stop creating new reader",
"file open failed or file dev inode has been changed since event happened")(
"project", readerConfig.second->GetProjectName())("logstore",
readerConfig.second->GetLogstoreName())(
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));
return LogFileReaderPtr();
}
} else {
// if first create, we should check and update file signature, because when blocked, new reader will have no
// chance to update signature
if (!readerPtr->CheckFileSignatureAndOffset(false)) {
LOG_ERROR(
sLogger,
("stop creating new reader",
"check file signature failed, possibly because file signature has been changed since the "
"checkpoint was last saved")("project", readerConfig.second->GetProjectName())(
"logstore", readerConfig.second->GetLogstoreName())(
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));
return LogFileReaderPtr();
}
}
}
int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt();
if (backFlag) { // new reader
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) { // reader not in reader array
mRotatorReaderMap[devInode] = readerPtr;
} else if (idx >= 0) { // reader in reader array
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
std::stable_sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt);
} else {
LOG_WARNING(sLogger,
("unexpected idx (perhaps because first checkpoint load after upgrade)",
idx)("real log path", readerPtr->GetRealLogPath())("host log path", readerPtr->GetHostLogPath()));
return LogFileReaderPtr();
}
readerPtr->SetReaderArray(&readerArray);
LOG_INFO(sLogger,
("log reader creation succeed",
"pushed into the corresponding reader queue")("project", readerConfig.second->GetProjectName())(
"logstore", readerConfig.second->GetLogstoreName())("config", readerConfig.second->GetConfigName())(
"log reader queue name", PathJoin(path, name))("log reader queue length", readerArray.size())(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode))(
"file size", readerPtr->GetFileSize())("reading start position", readerPtr->GetLastFilePos()));
return readerPtr;
}
void ModifyHandler::Handle(const Event& event) {
const string& path = event.GetSource();
const string& name = event.GetObject();
if (!IsValidSuffix(name))
return;
DevInode devInode(event.GetDev(), event.GetInode());
string logPath(path);
logPath.append(PATH_SEPARATOR).append(name);
// devInode is known to be invalid for inotify events
if (!devInode.IsValid() && (event.IsModify() || event.IsCreate() || event.IsMoveTo())) {
devInode = GetFileDevInode(logPath);
if (!devInode.IsValid()) {
// call stat failed, but we should try to find reader because the log file may be moved to another name
LOG_DEBUG(sLogger, ("get file dev inode error", logPath));
}
}
DevInodeLogFileReaderMap::iterator devInodeIter
= devInode.IsValid() ? mDevInodeReaderMap.find(devInode) : mDevInodeReaderMap.end();
// when file is deleted or movefrom, we can't find devinode, so set all log reader's delete flag
if (event.IsDeleted() || event.IsMoveFrom()) {
NameLogFileReaderMap::iterator iter = mNameReaderMap.find(name);
if (iter != mNameReaderMap.end()) {
LogFileReaderPtrArray& readerArray = iter->second;
// only set when reader array size is 1
if (readerArray.size() == (size_t)1) {
readerArray[0]->SetFileDeleted(true);
if (readerArray[0]->IsReadToEnd() || readerArray[0]->ShouldForceReleaseDeletedFileFd()) {
if (readerArray[0]->IsFileOpened()) {
LOG_INFO(
sLogger,
("close the log queue header file",
"delete event has come, and only one reader exists in the corresponding log queue, and "
"the queue header file has been read or is forced to close")(
"project", readerArray[0]->GetProject())("logstore", readerArray[0]->GetLogstore())(
"config", mConfigName)("log reader queue name", readerArray[0]->GetHostLogPath())(
"file device", readerArray[0]->GetDevInode().dev)(
"file inode", readerArray[0]->GetDevInode().inode)("file size",
readerArray[0]->GetFileSize()));
// release fd as quick as possible
readerArray[0]->CloseFilePtr();
}
}
}
}
} else if (event.IsContainerStopped()) {
for (auto& pair : mNameReaderMap) {
LogFileReaderPtrArray& readerArray = pair.second;
for (auto& reader : readerArray) {
if (reader->GetContainerID() != event.GetContainerID()) {
continue;
}
reader->SetContainerStopped();
if (reader->IsReadToEnd() || reader->ShouldForceReleaseDeletedFileFd()) {
if (reader->IsFileOpened()) {
LOG_INFO(
sLogger,
("close the file",
"the container has been stopped, and current file has been read or is forced to close")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("container id", event.GetContainerID()));
if (!readerArray[0]->ShouldForceReleaseDeletedFileFd() && reader->HasDataInCache()) {
ForceReadLogAndPush(readerArray[0]);
}
// release fd as quick as possible
reader->CloseFilePtr();
}
}
}
}
} else if (event.IsModify()) {
// devInode cannot be found, this means a rotate file(like a.log.1) has event, and reader for rotate file is
// moved to mRotatorReaderMap
if (devInode.IsValid() && devInodeIter == mDevInodeReaderMap.end()) {
DevInodeLogFileReaderMap::iterator rotateIter = mRotatorReaderMap.find(devInode);
// the reader for file(whether it's a.log or a.log.1) exists in mDevInodeReaderMap or mRotatorReaderMap
// if we can find reader in mRotatorReaderMap, it means the file after rotating(a.log.1) also matches config
if (rotateIter != mRotatorReaderMap.end()) {
// if dev inode change, just return.
// if find devinode in rotator map, we should check file signature
// if sig is different, delete this reader and create new one
// if sig is same and file size not change, ignore it
// if sig is same and file size is changed, move reader to normal reader map
LogFileReaderPtr rotatorReader = rotateIter->second;
LogFileReader::FileCompareResult cmpRst = rotatorReader->CompareToFile(logPath);
LOG_DEBUG(sLogger,
("find rotator reader", logPath)("compare result", (int)cmpRst)(
rotatorReader->GetHostLogPath(), rotatorReader->GetRealLogPath())(
ToString(rotatorReader->GetLastFilePos()), devInode.inode)("this", (uint64_t)this));
switch (cmpRst) {
case LogFileReader::FileCompareResult_DevInodeChange:
return;
break;
case LogFileReader::FileCompareResult_SigChange:
mRotatorReaderMap.erase(rotateIter);
break;
case LogFileReader::FileCompareResult_SigSameSizeChange: {
rotatorReader->UpdateLogPath(logPath);
LogFileReaderPtrArray& readerArray = mNameReaderMap[rotatorReader->GetHostLogPathFile()];
// new log
if (rotatorReader->GetRealLogPath() == rotatorReader->GetHostLogPath()) {
readerArray.push_back(rotatorReader);
} else {
// rotate log, push front
readerArray.push_front(rotatorReader);
}
rotatorReader->SetReaderArray(&readerArray);
mDevInodeReaderMap[devInode] = rotatorReader;
mRotatorReaderMap.erase(rotateIter);
devInodeIter = mDevInodeReaderMap.find(devInode);
} break;
case LogFileReader::FileCompareResult_SigSameSizeSame:
return;
break;
case LogFileReader::FileCompareResult_Error:
return;
break;
default:
return;
}
}
}
uint64_t beginTime = GetCurrentTimeInMicroSeconds();
LogFileReaderPtrArray* readerArrayPtr = NULL;
if (!devInode.IsValid()) {
// call stat failed, but we should try to find reader because the log file may be moved to another name
NameLogFileReaderMap::iterator iter = mNameReaderMap.find(name);
if (iter != mNameReaderMap.end()) {
readerArrayPtr = &(iter->second);
} else {
LOG_WARNING(sLogger, ("can not find logreader, may be deleted", logPath));
return;
}
} else if (devInodeIter == mDevInodeReaderMap.end()) {
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
// double check
// if event with config name, skip check
if (discoveryConfig.first
&& (!event.GetConfigName().empty() || discoveryConfig.first->IsMatch(path, name))) {
FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName);
MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName);
FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName);
uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName);
LogFileReaderPtr readerPtr = CreateLogFileReaderPtr(
path, name, devInode, readerConfig, multilineConfig, discoveryConfig, tagConfig, concurrency);
if (readerPtr.get() == NULL) {
LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
// if rotate queue is full, try read array header
if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) {
readerPtr = readerArray[0];
// push modify event, use head dev inode
// Event* ev = new Event(event.GetSource(), event.GetObject(), event.GetType(), event.GetWd(),
// event.GetCookie(), readerArray[0]->GetDevInode().dev, readerArray[0]->GetDevInode().inode);
// LogInput::GetInstance()->PushEventQueue(ev);
} else {
// other fail, return
return;
}
}
// if we need to skip first modify, reset flag and return
if (readerPtr->NeedSkipFirstModify()) {
readerPtr->DisableSkipFirstModify();
return;
}
readerArrayPtr = readerPtr->GetReaderArray();
} else {
return;
}
} else {
devInodeIter->second->UpdateLogPath(logPath);
readerArrayPtr = devInodeIter->second->GetReaderArray();
}
if (readerArrayPtr->size() == 0) {
LOG_ERROR(sLogger, ("unknow error, reader array size is 0", logPath));
return;
}
LogFileReaderPtr reader = (*readerArrayPtr)[0];
// If file modified, it means the file is existed, then we should set fileDeletedFlag to false
// NOTE: This may override the correct delete flag, which will cause fd close delay!
// reader->SetFileDeleted(false);
// make sure file open success, or we just return
bool isFileOpen = reader->IsFileOpened();
while (!reader->UpdateFilePtr()) {
if (event.IsReaderFlushTimeout()) {
break;
}
if (errno == EMFILE) {
LOG_WARNING(sLogger,
("too many open files", "skip this read operation")("log path", reader->GetHostLogPath()));
return;
}
// eg: a.log rotate to a.log1, event sequece : a.log write 2min ago, file ptr closed -> a.log rotate a.log1
// -> logtail process new a.log modify -> open file fail -> [old] delete reader -> logtail process a.log1
// modify
// -> cannot find reader, treat as new file -> read log tail(1MB)
// so when open file ptr faild, put this reader into rotator map, when process a.log1 modify event, we can
// find it in rotator map
LOG_INFO(sLogger,
("open the file failed", "move the corresponding reader to the rotator reader pool")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())("log reader queue size",
readerArrayPtr->size() - 1)(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("rotator reader pool size", mRotatorReaderMap.size() + 1));
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
mRotatorReaderMap[reader->GetDevInode()] = reader;
if (readerArrayPtr->size() == 0) {
return;
}
reader = (*readerArrayPtr)[0];
isFileOpen = reader->IsFileOpened();
LOG_DEBUG(sLogger, ("read other file", reader->GetDevInode().inode));
}
// the only situation where this condition is not met is when event is reader flush timeout
if (reader->IsFileOpened()) {
bool recreateReaderFlag = false;
// if dev inode changed, delete this reader and create reader
if (!reader->CheckDevInode()) {
LOG_INFO(sLogger,
("file dev inode changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode),
reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
recreateReaderFlag = true;
AlarmManager::GetInstance()->SendAlarm(
INNER_PROFILE_ALARM,
string("file dev inode changed, create new reader. new path:") + reader->GetHostLogPath()
+ " ,project:" + reader->GetProject() + " ,logstore:" + reader->GetLogstore(),
reader->GetRegion(),
reader->GetProject(),
reader->GetConfigName(),
reader->GetLogstore());
}
// if signature is different and logpath is different, delete this reader and create reader
else if (!reader->CheckFileSignatureAndOffset(isFileOpen) && logPath != reader->GetHostLogPath()) {
LOG_INFO(sLogger,
("file sig and name both changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode),
reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
recreateReaderFlag = true;
}
if (recreateReaderFlag) {
LOG_INFO(
sLogger,
("need to recreate reader", "remove the corresponding reader from the log reader queue")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())(
"log reader queue size", readerArrayPtr->size() - 1)("file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
// delete this reader, do not insert into rotator reader map
// repush this event and wait for create reader
Event* ev = new Event(event);
LogInput::GetInstance()->PushEventQueue(ev);
return;
}
if (reader->ShouldForceReleaseDeletedFileFd()) {
LOG_INFO(sLogger,
("force closing the file, project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("last file position", reader->GetLastFilePos()));
reader->CloseFilePtr();
}
}
bool hasMoreData;
do {
if (!ProcessQueueManager::GetInstance()->IsValidToPush(reader->GetQueueKey())) {
static int32_t s_lastOutPutTime = 0;
int32_t curTime = time(NULL);
if (curTime - s_lastOutPutTime > 600) {
s_lastOutPutTime = curTime;
LOG_WARNING(sLogger,
("logprocess queue is full, put modify event to event queue again",
reader->GetHostLogPath())(reader->GetProject(), reader->GetLogstore()));
AlarmManager::GetInstance()->SendAlarm(
PROCESS_QUEUE_BUSY_ALARM,
string("logprocess queue is full, put modify event to event queue again, file:")
+ reader->GetHostLogPath(),
reader->GetRegion(),
reader->GetProject(),
reader->GetConfigName(),
reader->GetLogstore());
}
BlockedEventManager::GetInstance()->UpdateBlockEvent(
reader->GetQueueKey(), mConfigName, event, reader->GetDevInode(), curTime);
return;
}
auto logBuffer = make_unique<LogBuffer>();
hasMoreData = reader->ReadLog(*logBuffer, &event);
int32_t pushRetry = PushLogToProcessor(reader, logBuffer.get());
if (!hasMoreData) {
if (reader->IsFileDeleted()) {
LOG_INFO(sLogger,
("close the file", "current file has been read, and is marked deleted")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize()));
reader->CloseFilePtr();
} else if (reader->IsContainerStopped()) {
// update container info one more time, ensure file is hold by same cotnainer
if (reader->UpdateContainerInfo() && !reader->IsContainerStopped()) {
LOG_INFO(sLogger,
("file is reused by a new container", reader->GetContainerID())(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
} else {
// release fd as quick as possible
LOG_INFO(sLogger,
("close the file",
"current file has been read, and the relative container has been stopped")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
}
}
break;
}
if (pushRetry >= 5 || GetCurrentTimeInMicroSeconds() - beginTime > mReadFileTimeSlice) {
LOG_DEBUG(
sLogger,
("read log breakout", "file io cost 1 time slice (50ms) or push blocked")("pushRetry", pushRetry)(
"begin time", beginTime)("path", event.GetSource())("file", event.GetObject()));
Event* ev = new Event(event);
ev->SetConfigName(mConfigName);
LogInput::GetInstance()->PushEventQueue(ev);
break;
}
// When loginput thread hold on, we should repush this event back.
// If we don't repush and this file has no modify event, this reader will never been read.
if (LogInput::GetInstance()->IsInterupt()) {
if (hasMoreData) {
LOG_INFO(
sLogger,
("read log interupt but has more data, reason", "log input thread hold on")(
"action", "repush modify event to event queue")("begin time", beginTime)(
"path", event.GetSource())("file", event.GetObject())("inode", reader->GetDevInode().inode)(
"offset", reader->GetLastFilePos())("size", reader->GetFileSize()));
} else {
LOG_DEBUG(
sLogger,
("read log breakout, reason", "log input thread hold on")(
"action", "repush modify event to event queue")("begin time", beginTime)(
"path", event.GetSource())("file", event.GetObject())("inode", reader->GetDevInode().inode)(
"offset", reader->GetLastFilePos())("size", reader->GetFileSize()));
}
Event* ev = new Event(event);
ev->SetConfigName(mConfigName);
LogInput::GetInstance()->PushEventQueue(ev);
break;
}
} while (true);
if (!hasMoreData && readerArrayPtr->size() > (size_t)1) {
// when a rotated reader finish its reading, it's unlikely that there will be data again
// so release file fd as quick as possible (open again if new data coming)
LOG_INFO(sLogger,
("close the file and move the corresponding reader to the rotator reader pool",
"current file has been read and more files are waiting in the log reader queue")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())("log reader queue size",
readerArrayPtr->size() - 1)(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("rotator reader pool size", mRotatorReaderMap.size() + 1));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
mRotatorReaderMap[reader->GetDevInode()] = reader;
// need to push modify event again, but without dev inode
// use head dev + inode
Event* ev = new Event(event.GetSource(),
event.GetObject(),
event.GetType(),
event.GetWd(),
event.GetCookie(),
(*readerArrayPtr)[0]->GetDevInode().dev,
(*readerArrayPtr)[0]->GetDevInode().inode);
ev->SetConfigName(mConfigName);
LogInput::GetInstance()->PushEventQueue(ev);
}
}
// if a file is created, and dev inode cannot found(this means it's a new file), create reader for this file, then
// insert reader into mDevInodeReaderMap
else if (event.IsCreate()) {
if (!devInode.IsValid()) {
return;
}
if (devInodeIter == mDevInodeReaderMap.end()) {
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
if (discoveryConfig.first
&& (!event.GetConfigName().empty() || discoveryConfig.first->IsMatch(path, name))) {
FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName);
MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName);
FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName);
uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName);
LogFileReaderPtr readerPtr = CreateLogFileReaderPtr(
path, name, devInode, readerConfig, multilineConfig, discoveryConfig, tagConfig, concurrency, true);
if (readerPtr.get() == NULL) {
return;
}
}
}
}
}
void ModifyHandler::HandleTimeOut() {
MakeSpaceForNewReader();
DeleteTimeoutReader();
DeleteRollbackReader();
// remove deleted reader
time_t nowTime = time(NULL);
NameLogFileReaderMap::iterator readerIter = mNameReaderMap.begin();
size_t closeFilePtrCount = 0;
size_t maxFilePtrCount = mNameReaderMap.size() > (size_t)100 ? mNameReaderMap.size() / 10 : 100;
for (; readerIter != mNameReaderMap.end();) {
bool actioned = false;
LogFileReaderPtrArray& readerArray = readerIter->second;
// donot check file delete flag or close ptr when array size > 1
if (readerArray.size() > 1) {
LOG_DEBUG(sLogger,
("HandleTimeOut filename", readerIter->first)("dir", readerArray[0]->GetHostLogPath().c_str())(
"action", "continue")("reason", "reader array size > 1"));
++readerIter;
actioned = true;
continue;
}
if (readerArray.size() == 1) {
LogFileReaderPtrArray::iterator iter = readerArray.begin();
// We don't care about container stop here.
// Because delete event should come after fd is released and Read will finally return IsFileDeleted true.
if ((*iter)->IsFileDeleted()
&& nowTime - (*iter)->GetDeletedTime() > INT32_FLAG(logreader_filedeleted_remove_interval)) {
actioned = true;
LOG_INFO(
sLogger,
("current file is marked deleted with no more readers in the log reader queue for some time",
"remove the corresponding reader from the log reader queue")("project", (*iter)->GetProject())(
"logstore", (*iter)->GetLogstore())("config", mConfigName)(
"log reader queue name", (*iter)->GetHostLogPath())("log reader queue size", 0)(
"file device", (*iter)->GetDevInode().dev)("file inode", (*iter)->GetDevInode().inode)(
"file size", (*iter)->GetFileSize())("last file position", (*iter)->GetLastFilePos()));
mDevInodeReaderMap.erase((*iter)->GetDevInode());
readerArray.erase(iter);
}
}
// only close file ptr when readerArray size is 1
// because when many file is queued, if we close file ptr, maybe we can't find this file again
if (readerArray.size() == 1) {
if (readerArray[0]->CloseTimeoutFilePtr(nowTime)) {
++closeFilePtrCount;
actioned = true;
LOG_DEBUG(
sLogger,
("HandleTimeOut filename", readerIter->first)("dir", readerArray[0]->GetHostLogPath().c_str())(
"action", "close")("reason", "file no new data timeout"));
}
}
if (!actioned && readerArray.size() > 0) {
LOG_DEBUG(sLogger,
("HandleTimeOut filename", readerIter->first)("dir", readerArray[0]->GetHostLogPath().c_str())(
"action", "continue")("reason", "nothing to do"));
}
if (readerArray.empty()) {
LOG_INFO(sLogger,
("remove empty log reader queue, config", mConfigName)("log reader queue name", readerIter->first)(
"log reader queue count", mNameReaderMap.size()));
readerIter = mNameReaderMap.erase(readerIter);
} else {
++readerIter;
}
if (closeFilePtrCount > maxFilePtrCount) {
LOG_DEBUG(sLogger,
("HandleTimeOut action",
"break")("closeFilePtrCount", closeFilePtrCount)("reason", "close file count > max file count"));
break;
}
}
}
bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
if (!isRotatorReader) {
for (DevInodeLogFileReaderMap::iterator it = mDevInodeReaderMap.begin(); it != mDevInodeReaderMap.end(); ++it) {
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY;
for (size_t i = 0; i < it->second->GetReaderArray()->size(); ++i) {
if (it->second->GetReaderArray()->at(i) == it->second) {
idxInReaderArray = i;
break;
}
}
it->second->DumpMetaToMem(checkConfigFlag, idxInReaderArray);
}
} else {
for (DevInodeLogFileReaderMap::iterator it = mRotatorReaderMap.begin(); it != mRotatorReaderMap.end(); ++it) {
it->second->DumpMetaToMem(checkConfigFlag, LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY);
}
}
return true;
}
bool ModifyHandler::IsAllFileRead() {
for (auto it = mNameReaderMap.begin(); it != mNameReaderMap.end(); ++it) {
if (it->second.size() > 1 || (!it->second.empty() && !it->second[0]->IsReadToEnd())) {
return false;
}
if (!it->second.empty()) {
// force flushing the last line immediately instead of waiting for timeout
ForceReadLogAndPush(it->second[0]);
}
}
return true;
}
void ModifyHandler::DeleteTimeoutReader() {
if ((int32_t)mDevInodeReaderMap.size() > INT32_FLAG(logreader_count_maxlimit))
DeleteTimeoutReader(86400);
else if ((int32_t)mDevInodeReaderMap.size() > INT32_FLAG(logreader_count_upperlimit))
DeleteTimeoutReader(86400 * 7);
else
DeleteTimeoutReader(INT32_FLAG(logreader_timeout_interval));
}
void ModifyHandler::DeleteTimeoutReader(int32_t timeoutInterval) {
time_t curTime = time(NULL);
NameLogFileReaderMap::iterator readerIter = mNameReaderMap.begin();
for (; readerIter != mNameReaderMap.end();) {
LogFileReaderPtrArray& readerArray = readerIter->second;
for (LogFileReaderPtrArray::iterator iter = readerArray.begin(); iter != readerArray.end();) {
int32_t interval = curTime - ((*iter)->GetLastUpdateTime());
if (interval > timeoutInterval) {
LOG_INFO(sLogger,
("remove the corresponding reader from the log reader queue",
"current file has not been updated for a long time")("project", (*iter)->GetProject())(
"logstore", (*iter)->GetLogstore())("config", mConfigName)(
"log reader queue name", (*iter)->GetHostLogPath())("log reader queue size",
readerArray.size() - 1)(
"file device", (*iter)->GetDevInode().dev)("file inode", (*iter)->GetDevInode().inode)(
"file size", (*iter)->GetFileSize())("last file position", (*iter)->GetLastFilePos()));
mDevInodeReaderMap.erase((*iter)->GetDevInode());
iter = readerArray.erase(iter);
}
// if current reader is not timeout, we should skip check last reader
// because when there are many readers, we only update first reader's last update time in modify handler
else
break;
}
if (readerArray.empty()) {
LOG_INFO(sLogger,
("remove empty log reader queue, config", mConfigName)("log reader queue name", readerIter->first)(
"log reader queue count", mNameReaderMap.size() - 1));
readerIter = mNameReaderMap.erase(readerIter);
} else {
++readerIter;
}
}
}
void ModifyHandler::DeleteRollbackReader() {
int32_t curTime = time(NULL);
DevInodeLogFileReaderMap::iterator readerIter = mRotatorReaderMap.begin();
vector<DevInode> deletedReaderKeys;
for (; readerIter != mRotatorReaderMap.end(); ++readerIter) {
int32_t interval = curTime - readerIter->second->GetLastUpdateTime();
readerIter->second->CloseTimeoutFilePtr(curTime);
if (interval > INT32_FLAG(logreader_filerotate_remove_interval)) {
deletedReaderKeys.push_back(readerIter->first);
LOG_INFO(sLogger,
("remove the corresponding reader from the reader rotator pool",
"current file has not been updated for a long time")("project", readerIter->second->GetProject())(
"logstore", readerIter->second->GetLogstore())("config", mConfigName)(
"file name", readerIter->second->GetRealLogPath())(
"file device", readerIter->second->GetDevInode().dev)("file inode",
readerIter->second->GetDevInode().inode)(
"file size", readerIter->second->GetFileSize())("last file position",
readerIter->second->GetLastFilePos()));
}
}
vector<DevInode>::iterator keyIter = deletedReaderKeys.begin();
for (; keyIter != deletedReaderKeys.end(); ++keyIter)
mRotatorReaderMap.erase(*keyIter);
}
void ModifyHandler::ForceReadLogAndPush(LogFileReaderPtr reader) {
auto logBuffer = make_unique<LogBuffer>();
auto pEvent = reader->CreateFlushTimeoutEvent();
reader->ReadLog(*logBuffer, pEvent.get());
PushLogToProcessor(reader, logBuffer.get());
}
int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* logBuffer) {
int32_t pushRetry = 0;
if (!logBuffer->rawBuffer.empty()) {
reader->ReportMetrics(logBuffer->readLength);
PipelineEventGroup group = LogFileReader::GenerateEventGroup(reader, logBuffer);
while (!ProcessorRunner::GetInstance()->PushQueue(reader->GetQueueKey(), 0, std::move(group))) // 10ms
{
++pushRetry;
if (pushRetry % 10 == 0)
LogInput::GetInstance()->TryReadEvents(false);
}
}
return pushRetry;
}
} // namespace logtail