void ModifyHandler::Handle()

in core/file_server/event_handler/EventHandler.cpp [484:912]


void ModifyHandler::Handle(const Event& event) {
    const string& path = event.GetSource();
    const string& name = event.GetObject();

    if (!IsValidSuffix(name))
        return;

    DevInode devInode(event.GetDev(), event.GetInode());
    string logPath(path);
    logPath.append(PATH_SEPARATOR).append(name);
    // devInode is known to be invalid for inotify events
    if (!devInode.IsValid() && (event.IsModify() || event.IsCreate() || event.IsMoveTo())) {
        devInode = GetFileDevInode(logPath);
        if (!devInode.IsValid()) {
            // call stat failed, but we should try to find reader because the log file may be moved to another name
            LOG_DEBUG(sLogger, ("get file dev inode error", logPath));
        }
    }

    DevInodeLogFileReaderMap::iterator devInodeIter
        = devInode.IsValid() ? mDevInodeReaderMap.find(devInode) : mDevInodeReaderMap.end();

    // when file is deleted or movefrom, we can't find devinode, so set all log reader's delete flag
    if (event.IsDeleted() || event.IsMoveFrom()) {
        NameLogFileReaderMap::iterator iter = mNameReaderMap.find(name);
        if (iter != mNameReaderMap.end()) {
            LogFileReaderPtrArray& readerArray = iter->second;
            // only set when reader array size is 1
            if (readerArray.size() == (size_t)1) {
                readerArray[0]->SetFileDeleted(true);
                if (readerArray[0]->IsReadToEnd() || readerArray[0]->ShouldForceReleaseDeletedFileFd()) {
                    if (readerArray[0]->IsFileOpened()) {
                        LOG_INFO(
                            sLogger,
                            ("close the log queue header file",
                             "delete event has come, and only one reader exists in the corresponding log queue, and "
                             "the queue header file has been read or is forced to close")(
                                "project", readerArray[0]->GetProject())("logstore", readerArray[0]->GetLogstore())(
                                "config", mConfigName)("log reader queue name", readerArray[0]->GetHostLogPath())(
                                "file device", readerArray[0]->GetDevInode().dev)(
                                "file inode", readerArray[0]->GetDevInode().inode)("file size",
                                                                                   readerArray[0]->GetFileSize()));
                        // release fd as quick as possible
                        readerArray[0]->CloseFilePtr();
                    }
                }
            }
        }
    } else if (event.IsContainerStopped()) {
        for (auto& pair : mNameReaderMap) {
            LogFileReaderPtrArray& readerArray = pair.second;
            for (auto& reader : readerArray) {
                if (reader->GetContainerID() != event.GetContainerID()) {
                    continue;
                }
                reader->SetContainerStopped();
                if (reader->IsReadToEnd() || reader->ShouldForceReleaseDeletedFileFd()) {
                    if (reader->IsFileOpened()) {
                        LOG_INFO(
                            sLogger,
                            ("close the file",
                             "the container has been stopped, and current file has been read or is forced to close")(
                                "project", reader->GetProject())("logstore", reader->GetLogstore())(
                                "config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
                                "file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
                                "file size", reader->GetFileSize())("container id", event.GetContainerID()));
                        if (!readerArray[0]->ShouldForceReleaseDeletedFileFd() && reader->HasDataInCache()) {
                            ForceReadLogAndPush(readerArray[0]);
                        }
                        // release fd as quick as possible
                        reader->CloseFilePtr();
                    }
                }
            }
        }
    } else if (event.IsModify()) {
        // devInode cannot be found, this means a rotate file(like a.log.1) has event, and reader for rotate file is
        // moved to mRotatorReaderMap
        if (devInode.IsValid() && devInodeIter == mDevInodeReaderMap.end()) {
            DevInodeLogFileReaderMap::iterator rotateIter = mRotatorReaderMap.find(devInode);
            // the reader for file(whether it's a.log or a.log.1) exists in mDevInodeReaderMap or mRotatorReaderMap
            // if we can find reader in mRotatorReaderMap, it means the file after rotating(a.log.1) also matches config
            if (rotateIter != mRotatorReaderMap.end()) {
                // if dev inode change, just return.
                // if find devinode in rotator map, we should check file signature
                // if sig is different, delete this reader and create new one
                // if sig is same and file size not change, ignore it
                // if sig is same and file size is changed, move reader to normal reader map
                LogFileReaderPtr rotatorReader = rotateIter->second;
                LogFileReader::FileCompareResult cmpRst = rotatorReader->CompareToFile(logPath);
                LOG_DEBUG(sLogger,
                          ("find rotator reader", logPath)("compare result", (int)cmpRst)(
                              rotatorReader->GetHostLogPath(), rotatorReader->GetRealLogPath())(
                              ToString(rotatorReader->GetLastFilePos()), devInode.inode)("this", (uint64_t)this));
                switch (cmpRst) {
                    case LogFileReader::FileCompareResult_DevInodeChange:
                        return;
                        break;
                    case LogFileReader::FileCompareResult_SigChange:
                        mRotatorReaderMap.erase(rotateIter);
                        break;
                    case LogFileReader::FileCompareResult_SigSameSizeChange: {
                        rotatorReader->UpdateLogPath(logPath);
                        LogFileReaderPtrArray& readerArray = mNameReaderMap[rotatorReader->GetHostLogPathFile()];
                        // new log
                        if (rotatorReader->GetRealLogPath() == rotatorReader->GetHostLogPath()) {
                            readerArray.push_back(rotatorReader);
                        } else {
                            // rotate log, push front
                            readerArray.push_front(rotatorReader);
                        }
                        rotatorReader->SetReaderArray(&readerArray);
                        mDevInodeReaderMap[devInode] = rotatorReader;
                        mRotatorReaderMap.erase(rotateIter);
                        devInodeIter = mDevInodeReaderMap.find(devInode);
                    } break;
                    case LogFileReader::FileCompareResult_SigSameSizeSame:
                        return;
                        break;
                    case LogFileReader::FileCompareResult_Error:
                        return;
                        break;
                    default:
                        return;
                }
            }
        }
        uint64_t beginTime = GetCurrentTimeInMicroSeconds();
        LogFileReaderPtrArray* readerArrayPtr = NULL;
        if (!devInode.IsValid()) {
            // call stat failed, but we should try to find reader because the log file may be moved to another name
            NameLogFileReaderMap::iterator iter = mNameReaderMap.find(name);
            if (iter != mNameReaderMap.end()) {
                readerArrayPtr = &(iter->second);
            } else {
                LOG_WARNING(sLogger, ("can not find logreader, may be deleted", logPath));
                return;
            }
        } else if (devInodeIter == mDevInodeReaderMap.end()) {
            FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
            // double check
            // if event with config name, skip check
            if (discoveryConfig.first
                && (!event.GetConfigName().empty() || discoveryConfig.first->IsMatch(path, name))) {
                FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName);
                MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName);
                FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName);
                uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName);
                LogFileReaderPtr readerPtr = CreateLogFileReaderPtr(
                    path, name, devInode, readerConfig, multilineConfig, discoveryConfig, tagConfig, concurrency);
                if (readerPtr.get() == NULL) {
                    LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
                    // if rotate queue is full, try read array header
                    if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) {
                        readerPtr = readerArray[0];
                        // push modify event, use head dev inode
                        // Event* ev = new Event(event.GetSource(), event.GetObject(), event.GetType(), event.GetWd(),
                        // event.GetCookie(), readerArray[0]->GetDevInode().dev, readerArray[0]->GetDevInode().inode);
                        // LogInput::GetInstance()->PushEventQueue(ev);
                    } else {
                        // other fail, return
                        return;
                    }
                }
                // if we need to skip first modify, reset flag and return
                if (readerPtr->NeedSkipFirstModify()) {
                    readerPtr->DisableSkipFirstModify();
                    return;
                }
                readerArrayPtr = readerPtr->GetReaderArray();
            } else {
                return;
            }
        } else {
            devInodeIter->second->UpdateLogPath(logPath);
            readerArrayPtr = devInodeIter->second->GetReaderArray();
        }
        if (readerArrayPtr->size() == 0) {
            LOG_ERROR(sLogger, ("unknow error, reader array size is 0", logPath));
            return;
        }
        LogFileReaderPtr reader = (*readerArrayPtr)[0];
        // If file modified, it means the file is existed, then we should set fileDeletedFlag to false
        // NOTE: This may override the correct delete flag, which will cause fd close delay!
        // reader->SetFileDeleted(false);

        // make sure file open success, or we just return
        bool isFileOpen = reader->IsFileOpened();
        while (!reader->UpdateFilePtr()) {
            if (event.IsReaderFlushTimeout()) {
                break;
            }

            if (errno == EMFILE) {
                LOG_WARNING(sLogger,
                            ("too many open files", "skip this read operation")("log path", reader->GetHostLogPath()));
                return;
            }
            // eg: a.log rotate to a.log1, event sequece : a.log write 2min ago, file ptr closed -> a.log rotate a.log1
            // -> logtail process new a.log modify -> open file fail -> [old] delete reader -> logtail process a.log1
            // modify
            //     -> cannot find reader, treat as new file -> read log tail(1MB)
            // so when open file ptr faild, put this reader into rotator map, when process a.log1 modify event, we can
            // find it in rotator map
            LOG_INFO(sLogger,
                     ("open the file failed", "move the corresponding reader to the rotator reader pool")(
                         "project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
                         "log reader queue name", reader->GetHostLogPath())("log reader queue size",
                                                                            readerArrayPtr->size() - 1)(
                         "file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
                         "file size", reader->GetFileSize())("rotator reader pool size", mRotatorReaderMap.size() + 1));
            readerArrayPtr->pop_front();
            mDevInodeReaderMap.erase(reader->GetDevInode());
            mRotatorReaderMap[reader->GetDevInode()] = reader;
            if (readerArrayPtr->size() == 0) {
                return;
            }
            reader = (*readerArrayPtr)[0];
            isFileOpen = reader->IsFileOpened();
            LOG_DEBUG(sLogger, ("read other file", reader->GetDevInode().inode));
        }

        // the only situation where this condition is not met is when event is reader flush timeout
        if (reader->IsFileOpened()) {
            bool recreateReaderFlag = false;
            // if dev inode changed, delete this reader and create reader
            if (!reader->CheckDevInode()) {
                LOG_INFO(sLogger,
                         ("file dev inode changed, create new reader. new path",
                          logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
                                                                         mRotatorReaderMap.size())(
                             ToString(reader->GetDevInode().inode),
                             reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
                recreateReaderFlag = true;
                AlarmManager::GetInstance()->SendAlarm(
                    INNER_PROFILE_ALARM,
                    string("file dev inode changed, create new reader. new path:") + reader->GetHostLogPath()
                        + " ,project:" + reader->GetProject() + " ,logstore:" + reader->GetLogstore(),
                    reader->GetRegion(),
                    reader->GetProject(),
                    reader->GetConfigName(),
                    reader->GetLogstore());
            }
            // if signature is different and logpath is different, delete this reader and create reader
            else if (!reader->CheckFileSignatureAndOffset(isFileOpen) && logPath != reader->GetHostLogPath()) {
                LOG_INFO(sLogger,
                         ("file sig and name both changed, create new reader. new path",
                          logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
                                                                         mRotatorReaderMap.size())(
                             ToString(reader->GetDevInode().inode),
                             reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
                recreateReaderFlag = true;
            }
            if (recreateReaderFlag) {
                LOG_INFO(
                    sLogger,
                    ("need to recreate reader", "remove the corresponding reader from the log reader queue")(
                        "project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
                        "log reader queue name", reader->GetHostLogPath())(
                        "log reader queue size", readerArrayPtr->size() - 1)("file device", reader->GetDevInode().dev)(
                        "file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
                readerArrayPtr->pop_front();
                mDevInodeReaderMap.erase(reader->GetDevInode());
                // delete this reader, do not insert into rotator reader map
                // repush this event and wait for create reader
                Event* ev = new Event(event);
                LogInput::GetInstance()->PushEventQueue(ev);
                return;
            }

            if (reader->ShouldForceReleaseDeletedFileFd()) {
                LOG_INFO(sLogger,
                         ("force closing the file, project", reader->GetProject())("logstore", reader->GetLogstore())(
                             "config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
                             "file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
                             "file size", reader->GetFileSize())("last file position", reader->GetLastFilePos()));
                reader->CloseFilePtr();
            }
        }

        bool hasMoreData;
        do {
            if (!ProcessQueueManager::GetInstance()->IsValidToPush(reader->GetQueueKey())) {
                static int32_t s_lastOutPutTime = 0;
                int32_t curTime = time(NULL);
                if (curTime - s_lastOutPutTime > 600) {
                    s_lastOutPutTime = curTime;
                    LOG_WARNING(sLogger,
                                ("logprocess queue is full, put modify event to event queue again",
                                 reader->GetHostLogPath())(reader->GetProject(), reader->GetLogstore()));

                    AlarmManager::GetInstance()->SendAlarm(
                        PROCESS_QUEUE_BUSY_ALARM,
                        string("logprocess queue is full, put modify event to event queue again, file:")
                            + reader->GetHostLogPath(),
                        reader->GetRegion(),
                        reader->GetProject(),
                        reader->GetConfigName(),
                        reader->GetLogstore());
                }

                BlockedEventManager::GetInstance()->UpdateBlockEvent(
                    reader->GetQueueKey(), mConfigName, event, reader->GetDevInode(), curTime);
                return;
            }
            auto logBuffer = make_unique<LogBuffer>();
            hasMoreData = reader->ReadLog(*logBuffer, &event);
            int32_t pushRetry = PushLogToProcessor(reader, logBuffer.get());
            if (!hasMoreData) {
                if (reader->IsFileDeleted()) {
                    LOG_INFO(sLogger,
                             ("close the file", "current file has been read, and is marked deleted")(
                                 "project", reader->GetProject())("logstore", reader->GetLogstore())(
                                 "config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
                                 "file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
                                 "file size", reader->GetFileSize()));
                    reader->CloseFilePtr();
                } else if (reader->IsContainerStopped()) {
                    // update container info one more time, ensure file is hold by same cotnainer
                    if (reader->UpdateContainerInfo() && !reader->IsContainerStopped()) {
                        LOG_INFO(sLogger,
                                 ("file is reused by a new container", reader->GetContainerID())(
                                     "project", reader->GetProject())("logstore", reader->GetLogstore())(
                                     "config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
                                     "file device", reader->GetDevInode().dev)(
                                     "file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
                    } else {
                        // release fd as quick as possible
                        LOG_INFO(sLogger,
                                 ("close the file",
                                  "current file has been read, and the relative container has been stopped")(
                                     "project", reader->GetProject())("logstore", reader->GetLogstore())(
                                     "config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
                                     "file device", reader->GetDevInode().dev)(
                                     "file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
                        ForceReadLogAndPush(reader);
                        reader->CloseFilePtr();
                    }
                }
                break;
            }
            if (pushRetry >= 5 || GetCurrentTimeInMicroSeconds() - beginTime > mReadFileTimeSlice) {
                LOG_DEBUG(
                    sLogger,
                    ("read log breakout", "file io cost 1 time slice (50ms) or push blocked")("pushRetry", pushRetry)(
                        "begin time", beginTime)("path", event.GetSource())("file", event.GetObject()));
                Event* ev = new Event(event);
                ev->SetConfigName(mConfigName);
                LogInput::GetInstance()->PushEventQueue(ev);
                break;
            }

            // When loginput thread hold on, we should repush this event back.
            // If we don't repush and this file has no modify event, this reader will never been read.
            if (LogInput::GetInstance()->IsInterupt()) {
                if (hasMoreData) {
                    LOG_INFO(
                        sLogger,
                        ("read log interupt but has more data, reason", "log input thread hold on")(
                            "action", "repush modify event to event queue")("begin time", beginTime)(
                            "path", event.GetSource())("file", event.GetObject())("inode", reader->GetDevInode().inode)(
                            "offset", reader->GetLastFilePos())("size", reader->GetFileSize()));
                } else {
                    LOG_DEBUG(
                        sLogger,
                        ("read log breakout, reason", "log input thread hold on")(
                            "action", "repush modify event to event queue")("begin time", beginTime)(
                            "path", event.GetSource())("file", event.GetObject())("inode", reader->GetDevInode().inode)(
                            "offset", reader->GetLastFilePos())("size", reader->GetFileSize()));
                }
                Event* ev = new Event(event);
                ev->SetConfigName(mConfigName);
                LogInput::GetInstance()->PushEventQueue(ev);
                break;
            }
        } while (true);

        if (!hasMoreData && readerArrayPtr->size() > (size_t)1) {
            // when a rotated reader finish its reading, it's unlikely that there will be data again
            // so release file fd as quick as possible (open again if new data coming)
            LOG_INFO(sLogger,
                     ("close the file and move the corresponding reader to the rotator reader pool",
                      "current file has been read and more files are waiting in the log reader queue")(
                         "project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
                         "log reader queue name", reader->GetHostLogPath())("log reader queue size",
                                                                            readerArrayPtr->size() - 1)(
                         "file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
                         "file size", reader->GetFileSize())("rotator reader pool size", mRotatorReaderMap.size() + 1));
            ForceReadLogAndPush(reader);
            reader->CloseFilePtr();
            readerArrayPtr->pop_front();
            mDevInodeReaderMap.erase(reader->GetDevInode());
            mRotatorReaderMap[reader->GetDevInode()] = reader;
            // need to push modify event again, but without dev inode
            // use head dev + inode
            Event* ev = new Event(event.GetSource(),
                                  event.GetObject(),
                                  event.GetType(),
                                  event.GetWd(),
                                  event.GetCookie(),
                                  (*readerArrayPtr)[0]->GetDevInode().dev,
                                  (*readerArrayPtr)[0]->GetDevInode().inode);
            ev->SetConfigName(mConfigName);
            LogInput::GetInstance()->PushEventQueue(ev);
        }
    }
    // if a file is created, and dev inode cannot found(this means it's a new file), create reader for this file, then
    // insert reader into mDevInodeReaderMap
    else if (event.IsCreate()) {
        if (!devInode.IsValid()) {
            return;
        }
        if (devInodeIter == mDevInodeReaderMap.end()) {
            FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName);
            if (discoveryConfig.first
                && (!event.GetConfigName().empty() || discoveryConfig.first->IsMatch(path, name))) {
                FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName);
                MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName);
                FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName);
                uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName);
                LogFileReaderPtr readerPtr = CreateLogFileReaderPtr(
                    path, name, devInode, readerConfig, multilineConfig, discoveryConfig, tagConfig, concurrency, true);
                if (readerPtr.get() == NULL) {
                    return;
                }
            }
        }
    }
}