core/file_server/event_handler/LogInput.cpp (463 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "LogInput.h"
#include <time.h>
#include "app_config/AppConfig.h"
#include "application/Application.h"
#include "checkpoint/CheckPointManager.h"
#include "common/FileSystemUtil.h"
#include "common/HashUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/event_handler/EventHandler.h"
#include "file_server/event_handler/HistoryFileImporter.h"
#include "file_server/polling/PollingCache.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "file_server/reader/GloablFileDescriptorManager.h"
#include "file_server/reader/LogFileReader.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"
#include "monitor/Monitor.h"
using namespace std;
DEFINE_FLAG_INT32(check_symbolic_link_interval, "seconds", 120);
DEFINE_FLAG_INT32(check_base_dir_interval, "seconds", 60);
DEFINE_FLAG_INT32(check_timeout_interval, "seconds", 600);
DEFINE_FLAG_INT32(log_input_thread_wait_interval, "microseconds", 20 * 1000);
DEFINE_FLAG_INT64(read_fs_events_interval, "microseconds", 20 * 1000);
DEFINE_FLAG_INT32(check_handler_timeout_interval, "seconds", 180);
DEFINE_FLAG_INT32(dump_inotify_watcher_interval, "seconds", 180);
DEFINE_FLAG_INT32(clear_config_match_interval, "seconds", 600);
DEFINE_FLAG_INT32(check_block_event_interval, "seconds", 1);
DEFINE_FLAG_INT32(read_local_event_interval, "seconds", 60);
DEFINE_FLAG_BOOL(force_close_file_on_container_stopped,
"whether close file handler immediately when associate container stopped",
false);
namespace logtail {
LogInput::LogInput() : mAccessMainThreadRWL(ReadWriteLock::PREFER_WRITER) {
mCheckBaseDirInterval = INT32_FLAG(check_base_dir_interval);
mCheckSymbolicLinkInterval = INT32_FLAG(check_symbolic_link_interval);
mInteruptFlag = false;
mForceClearFlag = false;
mIdleFlag = false;
mEventProcessCount = 0;
mLastUpdateMetricTime = 0;
}
LogInput::~LogInput() {
}
// Start() should only be called once except for UT
void LogInput::Start() {
mIdleFlag = false;
static bool initialized = false;
if (initialized)
return;
else
initialized = true;
mInteruptFlag = false;
mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mRegisterdHandlersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(
METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);
mThreadRes = async(launch::async, &LogInput::ProcessLoop, this);
}
void LogInput::Resume() {
LOG_INFO(sLogger, ("event handle daemon resume", "starts"));
mInteruptFlag = false;
mAccessMainThreadRWL.unlock();
LOG_INFO(sLogger, ("event handle daemon resume", "succeeded"));
}
void LogInput::HoldOn() {
if (Application::GetInstance()->IsExiting()) {
LOG_INFO(sLogger, ("input event handle daemon", "stop starts"));
unique_lock<mutex> lock(mThreadRunningMux);
if (!mThreadRes.valid()) {
return;
}
mThreadRes.wait(); // should we set a timeout here? what it network outrage for an hour?
LOG_INFO(sLogger, ("input event handle daemon", "stopped successfully"));
} else {
LOG_INFO(sLogger, ("input event handle daemon pause", "starts"));
mInteruptFlag = true;
mAccessMainThreadRWL.lock();
LOG_INFO(sLogger, ("input event handle daemon pause", "succeeded"));
}
}
void LogInput::TryReadEvents(bool forceRead) {
if (mInteruptFlag)
return;
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (forceRead || curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) {
vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
}
mLastReadEventMicroSeconds = curMicroSeconds;
}
vector<Event*> feedbackEvents;
BlockedEventManager::GetInstance()->GetFeedbackEvent(feedbackEvents);
if (feedbackEvents.size() > 0) {
PushEventQueue(feedbackEvents);
}
vector<Event*> pollingEvents;
PollingEventQueue::GetInstance()->PopAllEvents(pollingEvents);
if (pollingEvents.size() > 0) {
PushEventQueue(pollingEvents);
}
std::vector<Event*> containerStoppedEvents;
ConfigManager::GetInstance()->GetContainerStoppedEvents(containerStoppedEvents);
if (containerStoppedEvents.size() > 0) {
PushEventQueue(containerStoppedEvents);
}
mLastReadEventTime = ((int32_t)time(NULL));
}
void LogInput::FlowControl() {
const static int32_t FLOW_CONTROL_SLEEP_MICROSECONDS = 20 * 1000; // 20ms
const static int32_t MAX_SLEEP_COUNT = 50; // 1s
static int32_t sleepCount = 10;
static int32_t lastCheckTime = 0;
int32_t i = 0;
while (i < sleepCount) {
if (mInteruptFlag)
return;
usleep(FLOW_CONTROL_SLEEP_MICROSECONDS);
++i;
if (i % 5 == 0)
TryReadEvents(true);
}
if (mInteruptFlag)
return;
int32_t curTime = time(NULL);
if (curTime - lastCheckTime >= 1) {
lastCheckTime = curTime;
double cpuUsageLevel = LogtailMonitor::GetInstance()->GetRealtimeCpuLevel();
if (cpuUsageLevel >= 1.5) {
sleepCount += 5;
if (sleepCount > MAX_SLEEP_COUNT)
sleepCount = MAX_SLEEP_COUNT;
} else if (cpuUsageLevel >= 1.2) {
sleepCount += 2;
if (sleepCount > MAX_SLEEP_COUNT)
sleepCount = MAX_SLEEP_COUNT;
} else if (cpuUsageLevel >= 1.0) {
if (sleepCount < MAX_SLEEP_COUNT)
++sleepCount;
} else if (cpuUsageLevel >= 0.9) {
} else if (cpuUsageLevel >= 0.6) {
if (sleepCount > 0)
--sleepCount;
} else if (cpuUsageLevel >= 0.3) {
sleepCount -= 2;
if (sleepCount < 0)
sleepCount = 0;
} else {
sleepCount -= 5;
if (sleepCount < 0)
sleepCount = 0;
}
LOG_DEBUG(sLogger, ("cpuUsageLevel", cpuUsageLevel)("sleepCount", sleepCount));
}
}
bool LogInput::ReadLocalEvents() {
Json::Value localEventJson; // will contains the root value after parsing.
ParseConfResult loadRes = ParseConfig(GetLocalEventDataFileName(), localEventJson);
LOG_DEBUG(sLogger, ("load local events", GetLocalEventDataFileName())("result", loadRes));
if (loadRes != CONFIG_OK || !localEventJson.isArray()) {
return false;
}
// set discard old data flag, so that history data will not be dropped.
BOOL_FLAG(ilogtail_discard_old_data) = false;
LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) {
const Json::Value& eventItem = *iter;
if (!eventItem.isObject()) {
continue;
}
string source;
string object;
string configName;
if (eventItem.isMember("dir") && eventItem["dir"].isString()) {
source = eventItem["dir"].asString();
} else {
continue;
}
if (source.empty()) {
continue;
}
// remove last '/' to makesure source not end with '/'
if (source[source.size() - 1] == PATH_SEPARATOR[0]) {
source.resize(source.size() - 1);
}
if (eventItem.isMember("name") && eventItem["name"].isString()) {
object = eventItem["name"].asString();
} else {
continue;
}
if (object.size() == 0) {
continue;
}
if (eventItem.isMember("config") && eventItem["config"].isString()) {
configName = eventItem["config"].asString();
} else {
continue;
}
FileDiscoveryConfig discoveryConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(configName);
FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(configName);
MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(configName);
uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(configName);
if (!readerConfig.first) {
LOG_WARNING(sLogger, ("can not find config", configName));
continue;
}
HistoryFileEvent historyFileEvent;
historyFileEvent.mDirName = source;
historyFileEvent.mFileName = object;
historyFileEvent.mConfigName = configName;
historyFileEvent.mDiscoveryconfig = discoveryConfig;
historyFileEvent.mReaderConfig = readerConfig;
historyFileEvent.mMultilineConfig = multilineConfig;
historyFileEvent.mEOConcurrency = concurrency;
vector<string> objList;
if (!GetAllFiles(source, object, objList)) {
LOG_WARNING(sLogger, ("get all files", "failed"));
continue;
}
LOG_INFO(
sLogger,
("process local event, dir", source)("file name", object)("config", configName)(
"project", readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName()));
AlarmManager::GetInstance()->SendAlarm(LOAD_LOCAL_EVENT_ALARM,
string("process local event, dir:") + source + ", file name:" + object
+ ", config:" + configName
+ ", file count:" + ToString(objList.size()),
readerConfig.second->GetRegion(),
readerConfig.second->GetProjectName(),
readerConfig.second->GetConfigName(),
readerConfig.second->GetLogstoreName());
HistoryFileImporter* importer = HistoryFileImporter::GetInstance();
importer->PushEvent(historyFileEvent);
}
// after process event, clear the local file
FILE* pFile = fopen(GetLocalEventDataFileName().c_str(), "w");
if (pFile != NULL) {
fclose(pFile);
}
return true;
}
void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
const string& source = ev->GetSource();
const string& object = ev->GetObject();
LOG_DEBUG(sLogger,
("process event, type", ev->GetTypeString())("dir", ev->GetSource())("filename", ev->GetObject())(
"config", ev->GetConfigName()));
if (ev->IsTimeout())
dispatcher->UnregisterAllDir(source);
else {
if (ev->IsDir()
&& (ev->IsMoveFrom() || (ev->IsContainerStopped() && BOOL_FLAG(force_close_file_on_container_stopped)))) {
string path = source;
if (object.size() > 0)
path += PATH_SEPARATOR + object;
dispatcher->UnregisterAllDir(path);
} else if (ev->IsDir() && ev->IsContainerStopped()) {
string path = source;
if (object.size() > 0)
path += PATH_SEPARATOR + object;
dispatcher->StopAllDir(path, ev->GetContainerID());
} else {
EventHandler* handler = dispatcher->GetHandler(source.c_str());
if (handler) {
handler->Handle(*ev);
dispatcher->PropagateTimeout(source.c_str());
} else if (!ev->IsDeleted()) {
LOG_DEBUG(sLogger,
("LogInput handle event, find no handler", "register for it")("type", ev->GetType())(
"wd", ev->GetWd())("source", source)("object", object)("inode", ev->GetInode()));
if (ConfigManager::GetInstance()->RegisterDirectory(source, object)) {
handler = dispatcher->GetHandler(source.c_str());
if (handler) {
handler->Handle(*ev);
dispatcher->PropagateTimeout(source.c_str());
}
}
}
}
}
delete ev;
}
void LogInput::UpdateCriticalMetric(int32_t curTime) {
SET_GAUGE(mLastRunTime, mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
SET_GAUGE(mRegisterdHandlersTotal, EventDispatcher::GetInstance()->GetHandlerCount());
SET_GAUGE(mActiveReadersTotal, CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
}
void LogInput::ProcessLoop() {
LOG_INFO(sLogger, ("event handle daemon", "started"));
EventDispatcher* dispatcher = EventDispatcher::GetInstance();
dispatcher->StartTimeCount();
int32_t prevTime = time(NULL);
mLastReadEventTime = prevTime;
int32_t curTime = prevTime;
srand(0); // avoid random failures in unit tests
int32_t lastCheckDir = prevTime - rand() % 60;
int32_t lastCheckSymbolicLink = prevTime - rand() % 60;
time_t lastCheckHandlerTimeOut = prevTime - rand() % 60;
int32_t lastDumpInotifyWatcherTime = prevTime - rand() % 60;
int32_t lastForceClearFlag = prevTime - rand() % 60;
int32_t lastClearConfigCache = prevTime - rand() % 60;
mLastReadEventMicroSeconds = 0;
mLastUpdateMetricTime = prevTime - rand() % 60;
int32_t lastCheckBlockedTime = prevTime;
int32_t lastReadLocalEventTime = prevTime;
mEventProcessCount = 0;
BlockedEventManager* pBlockedEventManager = BlockedEventManager::GetInstance();
string path;
while (true) {
ReadLock lock(mAccessMainThreadRWL);
TryReadEvents(false);
Event* ev = PopEventQueue();
if (ev != NULL) {
++mEventProcessCount;
if (mIdleFlag)
delete ev;
else
ProcessEvent(dispatcher, ev);
} else {
unique_lock<mutex> lock(mFeedbackMux);
mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval)));
}
if (mIdleFlag)
continue;
curTime = time(NULL);
if (curTime - lastCheckBlockedTime >= INT32_FLAG(check_block_event_interval)) {
std::vector<Event*> pEventVec;
pBlockedEventManager->GetTimeoutEvent(pEventVec, curTime);
if (pEventVec.size() > 0) {
PushEventQueue(pEventVec);
}
lastCheckBlockedTime = curTime;
}
if (curTime - lastReadLocalEventTime >= INT32_FLAG(read_local_event_interval)) {
ReadLocalEvents();
lastReadLocalEventTime = curTime;
}
if (curTime - mLastUpdateMetricTime >= 40) {
UpdateCriticalMetric(curTime);
mLastUpdateMetricTime = curTime;
}
if (curTime - lastForceClearFlag > 600 && GetForceClearFlag()) {
lastForceClearFlag = curTime;
prevTime = 0;
lastCheckDir = 0;
lastCheckHandlerTimeOut = 0;
lastCheckSymbolicLink = 0;
}
if (curTime - prevTime >= INT32_FLAG(check_timeout_interval)) {
dispatcher->HandleTimeout();
prevTime = curTime;
}
if (curTime - lastCheckDir >= mCheckBaseDirInterval) {
// do not need to clear file checkpoint, we will clear all checkpoint after DumpCheckPointToLocal
// CheckPointManager::Instance()->CheckTimeoutCheckPoint();
// check root watch dir
ConfigManager::GetInstance()->RegisterHandlers();
lastCheckDir = curTime;
}
if (curTime - lastCheckSymbolicLink >= mCheckSymbolicLinkInterval) {
dispatcher->CheckSymbolicLink();
lastCheckSymbolicLink = curTime;
}
if (curTime - lastCheckHandlerTimeOut >= INT32_FLAG(check_handler_timeout_interval)) {
// call handle timeout
dispatcher->ProcessHandlerTimeOut();
lastCheckHandlerTimeOut = curTime;
}
if (curTime - lastDumpInotifyWatcherTime > INT32_FLAG(dump_inotify_watcher_interval)) {
dispatcher->DumpInotifyWatcherDirs();
lastDumpInotifyWatcherTime = curTime;
}
if (curTime - lastClearConfigCache > INT32_FLAG(clear_config_match_interval)) {
ConfigManager::GetInstance()->ClearConfigMatchCache();
lastClearConfigCache = curTime;
}
if (Application::GetInstance()->IsExiting()
&& (!BOOL_FLAG(enable_full_drain_mode) || EventDispatcher::GetInstance()->IsAllFileRead())) {
break;
}
}
mInteruptFlag = true;
}
void LogInput::PushEventQueue(std::vector<Event*>& eventVec) {
for (std::vector<Event*>::iterator iter = eventVec.begin(); iter != eventVec.end(); ++iter) {
string key;
key.append((*iter)->GetSource())
.append(">")
.append((*iter)->GetObject())
.append(">")
.append(ToString((*iter)->GetDev()))
.append(">")
.append(ToString((*iter)->GetInode()))
.append(">")
.append((*iter)->GetConfigName());
int64_t hashKey = HashSignatureString(key.c_str(), key.size());
if ((*iter)->GetType() == EVENT_MODIFY) {
if (mModifyEventSet.find(hashKey) != mModifyEventSet.end()) {
delete (*iter);
*iter = NULL;
continue;
} else
mModifyEventSet.insert(hashKey);
}
mInotifyEventQueue.push(*iter);
(*iter)->SetHashKey(hashKey);
}
}
void LogInput::PushEventQueue(Event* ev) {
string key;
key.append(ev->GetSource())
.append(">")
.append(ev->GetObject())
.append(">")
.append(ToString(ev->GetDev()))
.append(">")
.append(ToString(ev->GetInode()))
.append(">")
.append(ev->GetConfigName());
int64_t hashKey = HashSignatureString(key.c_str(), key.size());
if (ev->GetType() == EVENT_MODIFY) {
if (mModifyEventSet.find(hashKey) != mModifyEventSet.end()) {
delete ev;
return;
} else
mModifyEventSet.insert(hashKey);
}
ev->SetHashKey(hashKey);
mInotifyEventQueue.push(ev);
}
Event* LogInput::PopEventQueue() {
if (mInotifyEventQueue.size() > 0) {
Event* ev = mInotifyEventQueue.front();
mInotifyEventQueue.pop();
if (ev->GetType() == EVENT_MODIFY)
mModifyEventSet.erase(ev->GetHashKey());
return ev;
}
return NULL;
}
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
if (ev == NULL)
break;
delete ev;
}
mModifyEventSet.clear();
}
#endif
} // namespace logtail