in core/file_server/event_handler/EventHandler.cpp [484:912]
void ModifyHandler::Handle(const Event& event) {
const string& path = event.GetSource();
const string& name = event.GetObject();
if (!IsValidSuffix(name))
return;
DevInode devInode(event.GetDev(), event.GetInode());
string logPath(path);
logPath.append(PATH_SEPARATOR).append(name);
// devInode is known to be invalid for inotify events
if (!devInode.IsValid() && (event.IsModify() || event.IsCreate() || event.IsMoveTo())) {
devInode = GetFileDevInode(logPath);
if (!devInode.IsValid()) {
// call stat failed, but we should try to find reader because the log file may be moved to another name
LOG_DEBUG(sLogger, ("get file dev inode error", logPath));
}
}
DevInodeLogFileReaderMap::iterator devInodeIter
= devInode.IsValid() ? mDevInodeReaderMap.find(devInode) : mDevInodeReaderMap.end();
// when file is deleted or movefrom, we can't find devinode, so set all log reader's delete flag
if (event.IsDeleted() || event.IsMoveFrom()) {
NameLogFileReaderMap::iterator iter = mNameReaderMap.find(name);
if (iter != mNameReaderMap.end()) {
LogFileReaderPtrArray& readerArray = iter->second;
// only set when reader array size is 1
if (readerArray.size() == (size_t)1) {
readerArray[0]->SetFileDeleted(true);
if (readerArray[0]->IsReadToEnd() || readerArray[0]->ShouldForceReleaseDeletedFileFd()) {
if (readerArray[0]->IsFileOpened()) {
LOG_INFO(
sLogger,
("close the log queue header file",
"delete event has come, and only one reader exists in the corresponding log queue, and "
"the queue header file has been read or is forced to close")(
"project", readerArray[0]->GetProject())("logstore", readerArray[0]->GetLogstore())(
"config", mConfigName)("log reader queue name", readerArray[0]->GetHostLogPath())(
"file device", readerArray[0]->GetDevInode().dev)(
"file inode", readerArray[0]->GetDevInode().inode)("file size",
readerArray[0]->GetFileSize()));
// release fd as quick as possible
readerArray[0]->CloseFilePtr();
}
}
}
}
} else if (event.IsContainerStopped()) {
for (auto& pair : mNameReaderMap) {
LogFileReaderPtrArray& readerArray = pair.second;
for (auto& reader : readerArray) {
if (reader->GetContainerID() != event.GetContainerID()) {
continue;
}
reader->SetContainerStopped();
if (reader->IsReadToEnd() || reader->ShouldForceReleaseDeletedFileFd()) {
if (reader->IsFileOpened()) {
LOG_INFO(
sLogger,
("close the file",
"the container has been stopped, and current file has been read or is forced to close")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("container id", event.GetContainerID()));
if (!readerArray[0]->ShouldForceReleaseDeletedFileFd() && reader->HasDataInCache()) {
ForceReadLogAndPush(readerArray[0]);
}
// release fd as quick as possible
reader->CloseFilePtr();
}
}
}
}
} else if (event.IsModify()) {
// devInode cannot be found, this means a rotate file(like a.log.1) has event, and reader for rotate file is
// moved to mRotatorReaderMap
if (devInode.IsValid() && devInodeIter == mDevInodeReaderMap.end()) {
DevInodeLogFileReaderMap::iterator rotateIter = mRotatorReaderMap.find(devInode);
// the reader for file(whether it's a.log or a.log.1) exists in mDevInodeReaderMap or mRotatorReaderMap
// if we can find reader in mRotatorReaderMap, it means the file after rotating(a.log.1) also matches config
if (rotateIter != mRotatorReaderMap.end()) {
// if dev inode change, just return.
// if find devinode in rotator map, we should check file signature
// if sig is different, delete this reader and create new one
// if sig is same and file size not change, ignore it
// if sig is same and file size is changed, move reader to normal reader map
LogFileReaderPtr rotatorReader = rotateIter->second;
LogFileReader::FileCompareResult cmpRst = rotatorReader->CompareToFile(logPath);
LOG_DEBUG(sLogger,
("find rotator reader", logPath)("compare result", (int)cmpRst)(
rotatorReader->GetHostLogPath(), rotatorReader->GetRealLogPath())(
ToString(rotatorReader->GetLastFilePos()), devInode.inode)("this", (uint64_t)this));
switch (cmpRst) {
case LogFileReader::FileCompareResult_DevInodeChange:
return;
break;
case LogFileReader::FileCompareResult_SigChange:
mRotatorReaderMap.erase(rotateIter);
break;
case LogFileReader::FileCompareResult_SigSameSizeChange: {
rotatorReader->UpdateLogPath(logPath);
LogFileReaderPtrArray& readerArray = mNameReaderMap[rotatorReader->GetHostLogPathFile()];
// new log
if (rotatorReader->GetRealLogPath() == rotatorReader->GetHostLogPath()) {
readerArray.push_back(rotatorReader);
} else {
// rotate log, push front
readerArray.push_front(rotatorReader);
}
rotatorReader->SetReaderArray(&readerArray);
mDevInodeReaderMap[devInode] = rotatorReader;
mRotatorReaderMap.erase(rotateIter);
devInodeIter = mDevInodeReaderMap.find(devInode);
} break;
case LogFileReader::FileCompareResult_SigSameSizeSame:
return;
break;
case LogFileReader::FileCompareResult_Error:
return;
break;
default:
return;
}
}
}
uint64_t beginTime = GetCurrentTimeInMicroSeconds();
LogFileReaderPtrArray* readerArrayPtr = NULL;
if (!devInode.IsValid()) {
// call stat failed, but we should try to find reader because the log file may be moved to another name
NameLogFileReaderMap::iterator iter = mNameReaderMap.find(name);
if (iter != mNameReaderMap.end()) {
readerArrayPtr = &(iter->second);
} else {
LOG_WARNING(sLogger, ("can not find logreader, may be deleted", logPath));
return;
}
} else if (devInodeIter == mDevInodeReaderMap.end()) {
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
// double check
// if event with config name, skip check
if (discoveryConfig.first
&& (!event.GetConfigName().empty() || discoveryConfig.first->IsMatch(path, name))) {
FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName);
MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName);
FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName);
uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName);
LogFileReaderPtr readerPtr = CreateLogFileReaderPtr(
path, name, devInode, readerConfig, multilineConfig, discoveryConfig, tagConfig, concurrency);
if (readerPtr.get() == NULL) {
LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
// if rotate queue is full, try read array header
if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) {
readerPtr = readerArray[0];
// push modify event, use head dev inode
// Event* ev = new Event(event.GetSource(), event.GetObject(), event.GetType(), event.GetWd(),
// event.GetCookie(), readerArray[0]->GetDevInode().dev, readerArray[0]->GetDevInode().inode);
// LogInput::GetInstance()->PushEventQueue(ev);
} else {
// other fail, return
return;
}
}
// if we need to skip first modify, reset flag and return
if (readerPtr->NeedSkipFirstModify()) {
readerPtr->DisableSkipFirstModify();
return;
}
readerArrayPtr = readerPtr->GetReaderArray();
} else {
return;
}
} else {
devInodeIter->second->UpdateLogPath(logPath);
readerArrayPtr = devInodeIter->second->GetReaderArray();
}
if (readerArrayPtr->size() == 0) {
LOG_ERROR(sLogger, ("unknow error, reader array size is 0", logPath));
return;
}
LogFileReaderPtr reader = (*readerArrayPtr)[0];
// If file modified, it means the file is existed, then we should set fileDeletedFlag to false
// NOTE: This may override the correct delete flag, which will cause fd close delay!
// reader->SetFileDeleted(false);
// make sure file open success, or we just return
bool isFileOpen = reader->IsFileOpened();
while (!reader->UpdateFilePtr()) {
if (event.IsReaderFlushTimeout()) {
break;
}
if (errno == EMFILE) {
LOG_WARNING(sLogger,
("too many open files", "skip this read operation")("log path", reader->GetHostLogPath()));
return;
}
// eg: a.log rotate to a.log1, event sequece : a.log write 2min ago, file ptr closed -> a.log rotate a.log1
// -> logtail process new a.log modify -> open file fail -> [old] delete reader -> logtail process a.log1
// modify
// -> cannot find reader, treat as new file -> read log tail(1MB)
// so when open file ptr faild, put this reader into rotator map, when process a.log1 modify event, we can
// find it in rotator map
LOG_INFO(sLogger,
("open the file failed", "move the corresponding reader to the rotator reader pool")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())("log reader queue size",
readerArrayPtr->size() - 1)(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("rotator reader pool size", mRotatorReaderMap.size() + 1));
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
mRotatorReaderMap[reader->GetDevInode()] = reader;
if (readerArrayPtr->size() == 0) {
return;
}
reader = (*readerArrayPtr)[0];
isFileOpen = reader->IsFileOpened();
LOG_DEBUG(sLogger, ("read other file", reader->GetDevInode().inode));
}
// the only situation where this condition is not met is when event is reader flush timeout
if (reader->IsFileOpened()) {
bool recreateReaderFlag = false;
// if dev inode changed, delete this reader and create reader
if (!reader->CheckDevInode()) {
LOG_INFO(sLogger,
("file dev inode changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode),
reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
recreateReaderFlag = true;
AlarmManager::GetInstance()->SendAlarm(
INNER_PROFILE_ALARM,
string("file dev inode changed, create new reader. new path:") + reader->GetHostLogPath()
+ " ,project:" + reader->GetProject() + " ,logstore:" + reader->GetLogstore(),
reader->GetRegion(),
reader->GetProject(),
reader->GetConfigName(),
reader->GetLogstore());
}
// if signature is different and logpath is different, delete this reader and create reader
else if (!reader->CheckFileSignatureAndOffset(isFileOpen) && logPath != reader->GetHostLogPath()) {
LOG_INFO(sLogger,
("file sig and name both changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode),
reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
recreateReaderFlag = true;
}
if (recreateReaderFlag) {
LOG_INFO(
sLogger,
("need to recreate reader", "remove the corresponding reader from the log reader queue")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())(
"log reader queue size", readerArrayPtr->size() - 1)("file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
// delete this reader, do not insert into rotator reader map
// repush this event and wait for create reader
Event* ev = new Event(event);
LogInput::GetInstance()->PushEventQueue(ev);
return;
}
if (reader->ShouldForceReleaseDeletedFileFd()) {
LOG_INFO(sLogger,
("force closing the file, project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("last file position", reader->GetLastFilePos()));
reader->CloseFilePtr();
}
}
bool hasMoreData;
do {
if (!ProcessQueueManager::GetInstance()->IsValidToPush(reader->GetQueueKey())) {
static int32_t s_lastOutPutTime = 0;
int32_t curTime = time(NULL);
if (curTime - s_lastOutPutTime > 600) {
s_lastOutPutTime = curTime;
LOG_WARNING(sLogger,
("logprocess queue is full, put modify event to event queue again",
reader->GetHostLogPath())(reader->GetProject(), reader->GetLogstore()));
AlarmManager::GetInstance()->SendAlarm(
PROCESS_QUEUE_BUSY_ALARM,
string("logprocess queue is full, put modify event to event queue again, file:")
+ reader->GetHostLogPath(),
reader->GetRegion(),
reader->GetProject(),
reader->GetConfigName(),
reader->GetLogstore());
}
BlockedEventManager::GetInstance()->UpdateBlockEvent(
reader->GetQueueKey(), mConfigName, event, reader->GetDevInode(), curTime);
return;
}
auto logBuffer = make_unique<LogBuffer>();
hasMoreData = reader->ReadLog(*logBuffer, &event);
int32_t pushRetry = PushLogToProcessor(reader, logBuffer.get());
if (!hasMoreData) {
if (reader->IsFileDeleted()) {
LOG_INFO(sLogger,
("close the file", "current file has been read, and is marked deleted")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize()));
reader->CloseFilePtr();
} else if (reader->IsContainerStopped()) {
// update container info one more time, ensure file is hold by same cotnainer
if (reader->UpdateContainerInfo() && !reader->IsContainerStopped()) {
LOG_INFO(sLogger,
("file is reused by a new container", reader->GetContainerID())(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
} else {
// release fd as quick as possible
LOG_INFO(sLogger,
("close the file",
"current file has been read, and the relative container has been stopped")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
}
}
break;
}
if (pushRetry >= 5 || GetCurrentTimeInMicroSeconds() - beginTime > mReadFileTimeSlice) {
LOG_DEBUG(
sLogger,
("read log breakout", "file io cost 1 time slice (50ms) or push blocked")("pushRetry", pushRetry)(
"begin time", beginTime)("path", event.GetSource())("file", event.GetObject()));
Event* ev = new Event(event);
ev->SetConfigName(mConfigName);
LogInput::GetInstance()->PushEventQueue(ev);
break;
}
// When loginput thread hold on, we should repush this event back.
// If we don't repush and this file has no modify event, this reader will never been read.
if (LogInput::GetInstance()->IsInterupt()) {
if (hasMoreData) {
LOG_INFO(
sLogger,
("read log interupt but has more data, reason", "log input thread hold on")(
"action", "repush modify event to event queue")("begin time", beginTime)(
"path", event.GetSource())("file", event.GetObject())("inode", reader->GetDevInode().inode)(
"offset", reader->GetLastFilePos())("size", reader->GetFileSize()));
} else {
LOG_DEBUG(
sLogger,
("read log breakout, reason", "log input thread hold on")(
"action", "repush modify event to event queue")("begin time", beginTime)(
"path", event.GetSource())("file", event.GetObject())("inode", reader->GetDevInode().inode)(
"offset", reader->GetLastFilePos())("size", reader->GetFileSize()));
}
Event* ev = new Event(event);
ev->SetConfigName(mConfigName);
LogInput::GetInstance()->PushEventQueue(ev);
break;
}
} while (true);
if (!hasMoreData && readerArrayPtr->size() > (size_t)1) {
// when a rotated reader finish its reading, it's unlikely that there will be data again
// so release file fd as quick as possible (open again if new data coming)
LOG_INFO(sLogger,
("close the file and move the corresponding reader to the rotator reader pool",
"current file has been read and more files are waiting in the log reader queue")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())("log reader queue size",
readerArrayPtr->size() - 1)(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("rotator reader pool size", mRotatorReaderMap.size() + 1));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
mRotatorReaderMap[reader->GetDevInode()] = reader;
// need to push modify event again, but without dev inode
// use head dev + inode
Event* ev = new Event(event.GetSource(),
event.GetObject(),
event.GetType(),
event.GetWd(),
event.GetCookie(),
(*readerArrayPtr)[0]->GetDevInode().dev,
(*readerArrayPtr)[0]->GetDevInode().inode);
ev->SetConfigName(mConfigName);
LogInput::GetInstance()->PushEventQueue(ev);
}
}
// if a file is created, and dev inode cannot found(this means it's a new file), create reader for this file, then
// insert reader into mDevInodeReaderMap
else if (event.IsCreate()) {
if (!devInode.IsValid()) {
return;
}
if (devInodeIter == mDevInodeReaderMap.end()) {
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
if (discoveryConfig.first
&& (!event.GetConfigName().empty() || discoveryConfig.first->IsMatch(path, name))) {
FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName);
MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName);
FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName);
uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName);
LogFileReaderPtr readerPtr = CreateLogFileReaderPtr(
path, name, devInode, readerConfig, multilineConfig, discoveryConfig, tagConfig, concurrency, true);
if (readerPtr.get() == NULL) {
return;
}
}
}
}
}