core/file_server/ConfigManager.cpp (902 lines of code) (raw):

// Copyright 2022 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "ConfigManager.h" #include <cctype> #include <fcntl.h> #include <sys/stat.h> #include <sys/types.h> #if defined(__linux__) #include <fnmatch.h> #include <unistd.h> #endif #include <limits.h> #include <fstream> #include <set> #include <unordered_map> #include <unordered_set> #include <vector> #include "app_config/AppConfig.h" #include "checkpoint/CheckPointManager.h" #include "collection_pipeline/CollectionPipeline.h" #include "collection_pipeline/CollectionPipelineManager.h" #include "common/CompressTools.h" #include "common/ErrorUtil.h" #include "common/ExceptionBase.h" #include "common/FileSystemUtil.h" #include "common/HashUtil.h" #include "common/JsonUtil.h" #include "common/RuntimeUtil.h" #include "common/StringTools.h" #include "common/TimeUtil.h" #include "common/version.h" #include "constants/Constants.h" #include "file_server/EventDispatcher.h" #include "file_server/FileServer.h" #include "file_server/event_handler/EventHandler.h" #include "monitor/AlarmManager.h" using namespace std; // 商业版 // DEFINE_FLAG_BOOL(https_verify_peer, "set CURLOPT_SSL_VERIFYPEER, CURLOPT_SSL_VERIFYHOST option for libcurl", true); // #if defined(__linux__) || defined(__APPLE__) // DEFINE_FLAG_STRING(https_ca_cert, "set CURLOPT_CAINFO for libcurl", "ca-bundle.crt"); // #elif defined(_MSC_VER) // DEFINE_FLAG_STRING(https_ca_cert, "set CURLOPT_CAINFO for libcurl", "cacert.pem"); // #endif // DEFINE_FLAG_INT32(request_access_key_interval, "control the frenquency of GetAccessKey, seconds", 60); // DEFINE_FLAG_INT32(logtail_sys_conf_update_interval, "control the frenquency of load local machine conf, seconds", // 60); DEFINE_FLAG_BOOL(logtail_config_update_enable, "", true); // 废弃 // DEFINE_FLAG_STRING(default_global_topic, "default is empty string", ""); // DEFINE_FLAG_STRING(default_data_integrity_project, "default data integrity project", "data_integrity"); // DEFINE_FLAG_STRING(default_data_integrity_log_store, "default data integrity log store", "data_integrity"); // DEFINE_FLAG_INT32(default_data_integrity_time_pos, "default data integrity time pos", 0); // DEFINE_FLAG_STRING(default_log_time_reg, // "default log time reg", // "([0-9]{4})-(0[0-9]{1}|1[0-2])-(0[0-9]{1}|[12][0-9]{1}|3[01]) " // "(0[0-9]{1}|1[0-9]{1}|2[0-3]):[0-5][0-9]{1}:([0-5][0-9]{1})"); // DEFINE_FLAG_STRING(default_line_count_project, "default line count project", "line_count"); // DEFINE_FLAG_STRING(default_line_count_log_store, "default line count log store", "line_count"); // DEFINE_FLAG_BOOL(default_fuse_mode, "default fuse mode", false); // DEFINE_FLAG_BOOL(default_mark_offset_flag, "default mark offset flag", false); // DEFINE_FLAG_BOOL(default_check_ulogfs_env, "default check ulogfs env", false); // DEFINE_FLAG_INT32(default_max_depth_from_root, "default max depth from root", 1000); // DEFINE_FLAG_STRING(fuse_customized_config_name, // "name of specified config for fuse, should not be used by user", // "__FUSE_CUSTOMIZED_CONFIG__"); // 移动 // DEFINE_FLAG_STRING(profile_project_name, "profile project_name for logtail", ""); // DEFINE_FLAG_STRING(ilogtail_docker_file_path_config, "ilogtail docker path config file", "docker_path_config.json"); // DEFINE_FLAG_INT32(default_plugin_log_queue_size, "", 10); DEFINE_FLAG_INT32(wildcard_max_sub_dir_count, "", 1000); DEFINE_FLAG_INT32(config_match_max_cache_size, "", 1000000); DEFINE_FLAG_INT32(multi_config_alarm_interval, "second", 600); DEFINE_FLAG_STRING(ilogtail_docker_path_version, "ilogtail docker path config file", "0.1.0"); DEFINE_FLAG_INT32(max_docker_config_update_times, "max times docker config update in 3 minutes", 10); DEFINE_FLAG_INT32(docker_config_update_interval, "interval between docker config updates, seconds", 3); namespace logtail { // ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot) { // Get full path, if it is a relative path, prepend process execution dir. std::string fullPath = configName; if (IsRelativePath(fullPath)) { fullPath = GetProcessExecutionDir() + configName; } std::string buffer; if (FileReadResult::kOK != ReadFileContent(fullPath, buffer)) { return CONFIG_NOT_EXIST; } if (!IsValidJson(buffer.c_str(), buffer.length())) { return CONFIG_INVALID_FORMAT; } Json::CharReaderBuilder builder; builder["collectComments"] = false; std::unique_ptr<Json::CharReader> jsonReader(builder.newCharReader()); std::string jsonParseErrs; if (!jsonReader->parse(buffer.data(), buffer.data() + buffer.size(), &jsonRoot, &jsonParseErrs)) { LOG_WARNING(sLogger, ("ConfigName", configName)("ParseConfig error", jsonParseErrs)); return CONFIG_INVALID_FORMAT; } return CONFIG_OK; } // dir which is not timeout will be registerd recursively // if checkTimeout, will not register the dir which is timeout // if not checkTimeout, will register the dir which is timeout and add it to the timeout list bool ConfigManager::RegisterHandlersRecursively(const std::string& path, const FileDiscoveryConfig& config, bool checkTimeout) { if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } bool result = false; if (checkTimeout && config.first->IsTimeout(path)) { return result; } if (!config.first->IsDirectoryInBlacklist(path)) { result = EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler); } if (!result) { return result; } fsutil::Dir dir(path); if (!dir.Open()) { auto err = GetErrno(); AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM, string("Failed to open dir : ") + path + ";\terrno : " + ToString(err), config.second->GetRegion(), config.second->GetProjectName(), config.second->GetConfigName(), config.second->GetLogstoreName()); LOG_ERROR(sLogger, ("Open dir fail", path.c_str())("errno", ErrnoToString(err))); return false; } fsutil::Entry ent; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir()) { if (false == RegisterHandlersRecursively(item, config, checkTimeout)) { result = false; } } } return result; } ConfigManager::ConfigManager() { // mEnvFlag = false; // mStartTime = time(NULL); // mRemoveConfigFlag = false; // mHaveMappingPathConfig = false; // mMappingPathsChanged = false; mSharedHandler = NULL; // mThreadIsRunning = true; // mUpdateStat = NORMAL; // mRegionType = REGION_CORP; // mLogtailSysConfUpdateTime = 0; // mUserDefinedId.clear(); // mUserDefinedIdSet.clear(); // mAliuidSet.clear(); // SetDefaultPubAccessKeyId(STRING_FLAG(default_access_key_id)); // SetDefaultPubAccessKey(STRING_FLAG(default_access_key)); // SetDefaultPubAliuid(""); // SetUserAK(STRING_FLAG(logtail_profile_aliuid), // STRING_FLAG(logtail_profile_access_key_id), // STRING_FLAG(logtail_profile_access_key)); srand(time(NULL)); // CorrectionLogtailSysConfDir(); // first create dir then rewrite system-uuid file in GetSystemUUID // use a thread to get uuid, work around for CalculateDmiUUID hang // mUUID = CalculateDmiUUID(); // mInstanceId = CalculateRandomUUID() + "_" + LoongCollectorMonitor::mIpAddr + "_" + ToString(time(NULL)); // ReloadMappingConfig(); } ConfigManager::~ConfigManager() { // unordered_map<string, Config*>::iterator itr = mNameConfigMap.begin(); // for (; itr != mNameConfigMap.end(); ++itr) { // delete itr->second; // } unordered_map<std::string, EventHandler*>::iterator itr1 = mDirEventHandlerMap.begin(); for (; itr1 != mDirEventHandlerMap.end(); ++itr1) { if (itr1->second != mSharedHandler) delete itr1->second; } mDirEventHandlerMap.clear(); delete mSharedHandler; // mThreadIsRunning = false; // try { // if (mUUIDthreadPtr.get() != NULL) // mUUIDthreadPtr->GetValue(100); // } catch (...) { // } } void ConfigManager::RemoveHandler(const string& dir, bool delete_flag) { unordered_map<string, EventHandler*>::iterator itr = mDirEventHandlerMap.find(dir); if (itr != mDirEventHandlerMap.end()) { if (itr->second != mSharedHandler && delete_flag) { delete itr->second; } mDirEventHandlerMap.erase(itr); } } // this functions should only be called when register base dir bool ConfigManager::RegisterHandlers() { if (mSharedHandler == NULL) { mSharedHandler = new NormalEventHandler(); } vector<FileDiscoveryConfig> sortedConfigs; vector<FileDiscoveryConfig> wildcardConfigs; auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { if (itr->second.first->GetWildcardPaths().empty()) sortedConfigs.push_back(itr->second); else wildcardConfigs.push_back(itr->second); } sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength); bool result = true; for (auto itr = sortedConfigs.begin(); itr != sortedConfigs.end(); ++itr) { const FileDiscoveryOptions* config = itr->first; if (!config->IsContainerDiscoveryEnabled()) { result &= RegisterHandlers(config->GetBasePath(), *itr); } else { for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { result &= RegisterHandlers((*config->GetContainerInfo())[i].mRealBaseDir, *itr); } } } for (auto itr = wildcardConfigs.begin(); itr != wildcardConfigs.end(); ++itr) { const FileDiscoveryOptions* config = itr->first; if (!config->IsContainerDiscoveryEnabled()) { RegisterWildcardPath(*itr, config->GetWildcardPaths()[0], 0); } else { for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { RegisterWildcardPath(*itr, (*config->GetContainerInfo())[i].mRealBaseDir, 0); } } } return result; } void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, const std::string& path, int32_t depth) { if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return; } bool finish; if ((depth + 1) == ((int)config.first->GetWildcardPaths().size() - 1)) finish = true; else if ((depth + 1) < ((int)config.first->GetWildcardPaths().size() - 1)) finish = false; else return; // const path if (!config.first->GetConstWildcardPaths()[depth].empty()) { // stat directly string item = PathJoin(path, config.first->GetConstWildcardPaths()[depth]); fsutil::PathStat baseDirStat; if (!fsutil::PathStat::stat(item, baseDirStat)) { LOG_DEBUG(sLogger, ("get wildcard dir info error: ", item)(config.second->GetProjectName(), config.second->GetLogstoreName())("error", ErrnoToString(GetErrno()))); return; } if (!baseDirStat.IsDir()) return; if (finish) { DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(item); if (registerStatus == GET_REGISTER_STATUS_ERROR) { return; } if (config.first->mPreservedDirDepth < 0) RegisterDescendants( item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); else { // preserve_depth register RegisterHandlersWithinDepth(item, config, config.first->mPreservedDirDepth, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); } } else { RegisterWildcardPath(config, item, depth + 1); } return; } fsutil::Dir dir(path); if (!dir.Open()) { auto err = GetErrno(); AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM, string("Failed to open dir : ") + path + ";\terrno : " + ToString(err), config.second->GetRegion(), config.second->GetProjectName(), config.second->GetConfigName(), config.second->GetLogstoreName()); LOG_WARNING(sLogger, ("Open dir fail", path.c_str())("errno", err)); return; } fsutil::Entry ent; int32_t dirCount = 0; while ((ent = dir.ReadNext())) { if (dirCount >= INT32_FLAG(wildcard_max_sub_dir_count)) { LOG_WARNING(sLogger, ("too many sub directoried for path", path)("dirCount", dirCount)("basePath", config.first->GetBasePath())); AlarmManager::GetInstance()->SendAlarm(STAT_LIMIT_ALARM, string("too many sub directoried for path:" + path + " dirCount: " + ToString(dirCount) + " basePath" + config.first->GetBasePath()), config.second->GetRegion(), config.second->GetProjectName(), config.second->GetConfigName(), config.second->GetLogstoreName()); break; } if (!ent.IsDir()) continue; ++dirCount; string item = PathJoin(path, ent.Name()); // we should check match and then check finsh size_t dirIndex = config.first->GetWildcardPaths()[depth].size() + 1; #if defined(_MSC_VER) // Backward compatibility: the inner condition never happen. if (!BOOL_FLAG(enable_root_path_collection)) { if (dirIndex == (size_t)2) { dirIndex = 1; } } // For wildcard Windows path C:\*, mWildcardPaths[0] will be C:\, // so dirIndex should be adjusted by minus 1. else if (0 == depth) { const auto& firstWildcardPath = config.first->GetWildcardPaths()[0]; const auto pathSize = firstWildcardPath.size(); if (pathSize >= 2 && firstWildcardPath[pathSize - 1] == PATH_SEPARATOR[0] && firstWildcardPath[pathSize - 2] == ':') { --dirIndex; } } #else // if mWildcardPaths[depth] is '/', we should set dirIndex = 1 if (dirIndex == (size_t)2) { dirIndex = 1; } #endif if (fnmatch(&(config.first->GetWildcardPaths()[depth + 1].at(dirIndex)), ent.Name().c_str(), FNM_PATHNAME) == 0) { if (finish) { DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(item); if (registerStatus == GET_REGISTER_STATUS_ERROR) { return; } if (config.first->mPreservedDirDepth < 0) RegisterDescendants( item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); else { // preserve_depth register RegisterHandlersWithinDepth( item, config, config.first->mPreservedDirDepth, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); } } else { RegisterWildcardPath(config, item, depth + 1); } } } } // this functions should only be called when register base dir bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscoveryConfig& config) { bool result = true; // static set<string> notExistDirs; // static int32_t lastErrorLogTime = time(NULL); // if (notExistDirs.size() > 0 && (time(NULL) - lastErrorLogTime > 3600)) { // string dirs; // for (set<string>::iterator iter = notExistDirs.begin(); iter != notExistDirs.end(); ++iter) { // dirs.append(*iter); // dirs.append(" "); // } // lastErrorLogTime = time(NULL); // notExistDirs.clear(); // LOG_WARNING(sLogger, ("logPath in config not exist", dirs)); // } if (!CheckExistance(basePath)) { // if (!(config->mLogType == APSARA_LOG && basePath.find("/tmp") == 0) // && basePath.find(LOG_LOCAL_DEFINED_PATH_PREFIX) != 0) // notExistDirs.insert(basePath); if (EventDispatcher::GetInstance()->IsRegistered(basePath.c_str())) { EventDispatcher::GetInstance()->UnregisterAllDir(basePath); LOG_DEBUG(sLogger, ("logPath in config not exist, unregister existed monitor", basePath)); } return result; } DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath); if (registerStatus == GET_REGISTER_STATUS_ERROR) return result; if (config.first->mPreservedDirDepth < 0) result = RegisterDescendants( basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); else { // preserve_depth register result = RegisterHandlersWithinDepth(basePath, config, config.first->mPreservedDirDepth, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); } return result; } bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) { // TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very // low possibility to match a sub directory name, so here will return false in most cases. // e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir. // Match(subdir, *.log) = false. FileDiscoveryConfig config = FindBestMatch(source, object); if (config.first && !config.first->IsDirectoryInBlacklist(source)) { return EventDispatcher::GetInstance()->RegisterEventHandler(source, config, mSharedHandler); } return false; } bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int preservedDirDepth, int maxDepth) { if (maxDepth < 0) { return true; } if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } if (preservedDirDepth < 0) { fsutil::PathStat statBuf; if (!fsutil::PathStat::stat(path, statBuf)) { return true; } int64_t sec = 0; int64_t nsec = 0; statBuf.GetLastWriteTime(sec, nsec); auto curTime = time(nullptr); if (curTime - sec > INT32_FLAG(timeout_interval)) { return true; } } fsutil::Dir dir(path); if (!dir.Open()) { int err = GetErrno(); AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM, string("Failed to open dir : ") + path + ";\terrno : " + ToString(err), config.second->GetRegion(), config.second->GetProjectName(), config.second->GetConfigName(), config.second->GetLogstoreName()); LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler))) { // break;// fail early, do not try to register others return false; } if (maxDepth == 0) { return true; } if (preservedDirDepth == 0) { DirCheckPointPtr dirCheckPoint; if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) { // path had dircheckpoint means it was watched before, so it is valid const set<string>& subdir = dirCheckPoint.get()->mSubDir; for (const auto& it : subdir) { RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1); } return true; } } fsutil::Entry ent; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) { RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1); } } return true; } // path not terminated by '/', path already registered bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) { if (withinDepth < 0) { return true; } if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } fsutil::Dir dir(path); if (!dir.Open()) { auto err = GetErrno(); AlarmManager::GetInstance()->SendAlarm(LOGDIR_PERMISSION_ALARM, string("Failed to open dir : ") + path + ";\terrno : " + ToString(err), config.second->GetRegion(), config.second->GetProjectName(), config.second->GetConfigName(), config.second->GetLogstoreName()); LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } if (!EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler)) { // break;// fail early, do not try to register others return false; } if (withinDepth == 0) { return true; } fsutil::Entry ent; bool result = true; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) { RegisterDescendants(item, config, withinDepth - 1); } } return result; } void ConfigManager::ClearConfigMatchCache() { static const int32_t FORCE_CLEAR_INTERVAL = 6 * 3600; static int32_t s_lastClearTime = (int32_t)time(NULL) - rand() % 600; static int32_t s_lastClearAllTime = (int32_t)time(NULL) - rand() % 600; int32_t curTime = (int32_t)time(NULL); ScopedSpinLock cachedLock(mCacheFileConfigMapLock); if (curTime - s_lastClearTime > FORCE_CLEAR_INTERVAL || mCacheFileConfigMap.size() > (size_t)INT32_FLAG(config_match_max_cache_size)) { s_lastClearTime = curTime; mCacheFileConfigMap.clear(); } ScopedSpinLock allCachedLock(mCacheFileAllConfigMapLock); if (curTime - s_lastClearAllTime > FORCE_CLEAR_INTERVAL || mCacheFileAllConfigMap.size() > (size_t)INT32_FLAG(config_match_max_cache_size)) { s_lastClearAllTime = curTime; mCacheFileAllConfigMap.clear(); } } FileDiscoveryConfig ConfigManager::FindBestMatch(const string& path, const string& name) { string cachedFileKey(path); cachedFileKey.push_back('<'); cachedFileKey.append(name); bool acceptMultiConfig = AppConfig::GetInstance()->IsAcceptMultiConfig(); { ScopedSpinLock cachedLock(mCacheFileConfigMapLock); auto iter = mCacheFileConfigMap.find(cachedFileKey); if (iter != mCacheFileConfigMap.end()) { // if need report alarm, do not return, just continue to find all match and send alarm if (acceptMultiConfig || iter->second.second == 0 || time(NULL) - iter->second.second < INT32_FLAG(multi_config_alarm_interval)) { return iter->second.first; } } } const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); auto itr = nameConfigMap.begin(); FileDiscoveryConfig prevMatch(nullptr, nullptr); size_t prevLen = 0; size_t curLen = 0; uint32_t nameRepeat = 0; string logNameList; vector<FileDiscoveryConfig> multiConfigs; for (; itr != nameConfigMap.end(); ++itr) { const FileDiscoveryOptions* config = itr->second.first; // // exclude __FUSE_CONFIG__ // if (itr->first == STRING_FLAG(fuse_customized_config_name)) { // continue; // } bool match = config->IsMatch(path, name); if (match) { // if force multi config, do not send alarm if (!name.empty() && !config->mAllowingIncludedByMultiConfigs) { nameRepeat++; logNameList.append("logstore:"); logNameList.append(itr->second.second->GetLogstoreName()); logNameList.append(",config:"); logNameList.append(itr->second.second->GetConfigName()); logNameList.append(" "); multiConfigs.push_back(itr->second); } // note: best config is the one which length is longest and create time is nearest curLen = config->GetBasePath().size(); if (prevLen < curLen) { prevMatch = itr->second; prevLen = curLen; } else if (prevLen == curLen && prevMatch.first) { if (prevMatch.second->GetCreateTime() > itr->second.second->GetCreateTime()) { prevMatch = itr->second; prevLen = curLen; } } } } // file include in multi config, find config for path will not trigger this alarm // do not send alarm to config with mForceMultiConfig if (nameRepeat > 1 && !name.empty() && !acceptMultiConfig) { LOG_ERROR(sLogger, ("file", path + '/' + name)("include in multi config", logNameList)("best", prevMatch.second->GetConfigName())); for (auto iter = multiConfigs.begin(); iter != multiConfigs.end(); ++iter) { AlarmManager::GetInstance()->SendAlarm( MULTI_CONFIG_MATCH_ALARM, path + '/' + name + " include in multi config, best and oldest match: " + prevMatch.second->GetProjectName() + ',' + prevMatch.second->GetLogstoreName() + ',' + prevMatch.second->GetConfigName() + ", allmatch: " + logNameList, (*iter).second->GetRegion(), (*iter).second->GetProjectName(), (*iter).second->GetConfigName(), (*iter).second->GetLogstoreName()); } } { ScopedSpinLock cachedLock(mCacheFileConfigMapLock); // use operator [], force update time mCacheFileConfigMap[cachedFileKey] = std::make_pair(prevMatch, nameRepeat > 1 && !name.empty() ? (int32_t)time(NULL) : (int32_t)0); } return prevMatch; } int32_t ConfigManager::FindAllMatch(vector<FileDiscoveryConfig>& allConfig, const std::string& path, const std::string& name /*= ""*/) { static AppConfig* appConfig = AppConfig::GetInstance(); string cachedFileKey(path); cachedFileKey.push_back('<'); cachedFileKey.append(name); const int32_t maxMultiConfigSize = appConfig->GetMaxMultiConfigSize(); { ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock); auto iter = mCacheFileAllConfigMap.find(cachedFileKey); if (iter != mCacheFileAllConfigMap.end()) { if (iter->second.second == 0 || time(NULL) - iter->second.second < INT32_FLAG(multi_config_alarm_interval)) { allConfig = iter->second.first; return (int32_t)allConfig.size(); } } } bool alarmFlag = false; auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); auto itr = nameConfigMap.begin(); for (; itr != nameConfigMap.end(); ++itr) { const FileDiscoveryOptions* config = itr->second.first; // // exclude __FUSE_CONFIG__ // if (itr->first == STRING_FLAG(fuse_customized_config_name)) { // continue; // } bool match = config->IsMatch(path, name); if (match) { allConfig.push_back(itr->second); } } if (!name.empty() && allConfig.size() > static_cast<size_t>(maxMultiConfigSize)) { // only report log file alarm alarmFlag = true; sort(allConfig.begin(), allConfig.end(), FileDiscoveryOptions::CompareByDepthAndCreateTime); SendAllMatchAlarm(path, name, allConfig, maxMultiConfigSize); allConfig.resize(maxMultiConfigSize); } { ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock); // use operator [], force update time mCacheFileAllConfigMap[cachedFileKey] = std::make_pair(allConfig, alarmFlag ? (int32_t)time(NULL) : (int32_t)0); } return (int32_t)allConfig.size(); } int32_t ConfigManager::FindMatchWithForceFlag(std::vector<FileDiscoveryConfig>& allConfig, const string& path, const string& name) { const bool acceptMultiConfig = AppConfig::GetInstance()->IsAcceptMultiConfig(); string cachedFileKey(path); cachedFileKey.push_back('<'); cachedFileKey.append(name); { ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock); auto iter = mCacheFileAllConfigMap.find(cachedFileKey); if (iter != mCacheFileAllConfigMap.end()) { if (iter->second.second == 0 || time(NULL) - iter->second.second < INT32_FLAG(multi_config_alarm_interval)) { allConfig = iter->second.first; return (int32_t)allConfig.size(); } } } auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); auto itr = nameConfigMap.begin(); FileDiscoveryConfig prevMatch = make_pair(nullptr, nullptr); size_t prevLen = 0; size_t curLen = 0; uint32_t nameRepeat = 0; string logNameList; vector<FileDiscoveryConfig> multiConfigs; for (; itr != nameConfigMap.end(); ++itr) { FileDiscoveryConfig config = itr->second; // // exclude __FUSE_CONFIG__ // if (itr->first == STRING_FLAG(fuse_customized_config_name)) { // continue; // } bool match = config.first->IsMatch(path, name); if (match) { // if force multi config, do not send alarm if (!name.empty() && !config.first->mAllowingIncludedByMultiConfigs) { nameRepeat++; logNameList.append("logstore:"); logNameList.append(config.second->GetLogstoreName()); logNameList.append(",config:"); logNameList.append(config.second->GetConfigName()); logNameList.append(" "); multiConfigs.push_back(config); } if (!config.first->mAllowingIncludedByMultiConfigs) { // if not ForceMultiConfig, find best match in normal cofigs // note: best config is the one which length is longest and create time is nearest curLen = config.first->GetBasePath().size(); if (prevLen < curLen) { prevMatch = config; prevLen = curLen; } else if (prevLen == curLen && prevMatch.first) { if (prevMatch.second->GetCreateTime() > config.second->GetCreateTime()) { prevMatch = config; prevLen = curLen; } } } else { // save ForceMultiConfig allConfig.push_back(config); } } } bool alarmFlag = false; // file include in multi config, find config for path will not trigger this alarm // do not send alarm to config with mForceMultiConfig if (nameRepeat > 1 && !acceptMultiConfig && prevMatch.first) { alarmFlag = true; LOG_ERROR(sLogger, ("file", path + '/' + name)("include in multi config", logNameList)("best", prevMatch.second->GetConfigName())); for (auto iter = multiConfigs.begin(); iter != multiConfigs.end(); ++iter) { AlarmManager::GetInstance()->SendAlarm( MULTI_CONFIG_MATCH_ALARM, path + '/' + name + " include in multi config, best and oldest match: " + prevMatch.second->GetProjectName() + ',' + prevMatch.second->GetLogstoreName() + ',' + prevMatch.second->GetConfigName() + ", allmatch: " + logNameList, (*iter).second->GetRegion(), (*iter).second->GetProjectName(), (*iter).second->GetConfigName(), (*iter).second->GetLogstoreName()); } } if (prevMatch.first) { allConfig.push_back(prevMatch); } { ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock); // use operator [], force update time mCacheFileAllConfigMap[cachedFileKey] = std::make_pair(allConfig, alarmFlag ? (int32_t)time(NULL) : (int32_t)0); } return (int32_t)allConfig.size(); } void ConfigManager::SendAllMatchAlarm(const string& path, const string& name, vector<FileDiscoveryConfig>& allConfig, int32_t maxMultiConfigSize) { string allConfigNames; for (auto iter = allConfig.begin(); iter != allConfig.end(); ++iter) { allConfigNames.append("[") .append((*iter).second->GetProjectName()) .append(" : ") .append((*iter).second->GetLogstoreName()) .append(" : ") .append((*iter).second->GetConfigName()) .append("]"); } LOG_ERROR(sLogger, ("file", path + '/' + name)("include in too many configs", allConfig.size())( "max multi config size", maxMultiConfigSize)("allconfigs", allConfigNames)); for (auto iter = allConfig.begin(); iter != allConfig.end(); ++iter) AlarmManager::GetInstance()->SendAlarm( TOO_MANY_CONFIG_ALARM, path + '/' + name + " include in too many configs:" + ToString(allConfig.size()) + ", max multi config size : " + ToString(maxMultiConfigSize) + ", allmatch: " + allConfigNames, (*iter).second->GetRegion(), (*iter).second->GetProjectName(), (*iter).second->GetConfigName(), (*iter).second->GetLogstoreName()); } void ConfigManager::AddHandlerToDelete(EventHandler* handler) { mHandlersToDelete.push_back(handler); } void ConfigManager::DeleteHandlers() { for (size_t i = 0; i < mHandlersToDelete.size(); ++i) { if (mHandlersToDelete[i] != mSharedHandler && mHandlersToDelete[i] != NULL) delete mHandlersToDelete[i]; } mHandlersToDelete.clear(); } // GetRelatedConfigs calculates related configs of @path. // Two kind of relations: // 1. No wildcard path: the base path of Config is the prefix of @path and within depth. // 2. Wildcard path: @path matches and within depth. void ConfigManager::GetRelatedConfigs(const std::string& path, std::vector<FileDiscoveryConfig>& configs) { const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); for (auto iter = nameConfigMap.begin(); iter != nameConfigMap.end(); ++iter) { if (iter->second.first->IsMatch(path, "")) { configs.push_back(iter->second); } } } bool ConfigManager::UpdateContainerPath(ConfigContainerInfoUpdateCmd* cmd) { mContainerInfoCmdLock.lock(); mContainerInfoCmdVec.push_back(cmd); mContainerInfoCmdLock.unlock(); return true; } bool ConfigManager::DoUpdateContainerPaths() { mContainerInfoCmdLock.lock(); std::vector<ConfigContainerInfoUpdateCmd*> tmpPathCmdVec = mContainerInfoCmdVec; mContainerInfoCmdVec.clear(); mContainerInfoCmdLock.unlock(); LOG_INFO(sLogger, ("update container path", tmpPathCmdVec.size())); for (size_t i = 0; i < tmpPathCmdVec.size(); ++i) { FileDiscoveryConfig config = FileServer::GetInstance()->GetFileDiscoveryConfig(tmpPathCmdVec[i]->mConfigName); if (!config.first) { LOG_ERROR(sLogger, ("invalid container path update cmd", tmpPathCmdVec[i]->mConfigName)("params", tmpPathCmdVec[i]->mJsonParams.toStyledString())); continue; } if (tmpPathCmdVec[i]->mDeleteFlag) { if (config.first->DeleteContainerInfo(tmpPathCmdVec[i]->mJsonParams)) { LOG_DEBUG(sLogger, ("container path delete cmd success", tmpPathCmdVec[i]->mConfigName)("params", tmpPathCmdVec[i]->mJsonParams.toStyledString())); } else { LOG_ERROR(sLogger, ("container path delete cmd fail", tmpPathCmdVec[i]->mConfigName)("params", tmpPathCmdVec[i]->mJsonParams.toStyledString())); } } else { if (config.first->UpdateContainerInfo(tmpPathCmdVec[i]->mJsonParams, config.second)) { LOG_DEBUG(sLogger, ("container path update cmd success", tmpPathCmdVec[i]->mConfigName)("params", tmpPathCmdVec[i]->mJsonParams.toStyledString())); } else { LOG_ERROR(sLogger, ("container path update cmd fail", tmpPathCmdVec[i]->mConfigName)("params", tmpPathCmdVec[i]->mJsonParams.toStyledString())); } } delete tmpPathCmdVec[i]; } return true; } bool ConfigManager::IsUpdateContainerPaths() { mContainerInfoCmdLock.lock(); bool rst = false; for (size_t i = 0; i < mContainerInfoCmdVec.size(); ++i) { ConfigContainerInfoUpdateCmd* pCmd = mContainerInfoCmdVec[i]; if (pCmd->mDeleteFlag) { rst = true; break; } FileDiscoveryConfig pConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(pCmd->mConfigName); if (!pConfig.first) { continue; } if (!pConfig.first->IsSameContainerInfo(pCmd->mJsonParams, pConfig.second)) { rst = true; break; } } if (mContainerInfoCmdVec.size() > 0) { LOG_INFO(sLogger, ("check container path update flag", rst)("size", mContainerInfoCmdVec.size())); } if (rst == false) { for (size_t i = 0; i < mContainerInfoCmdVec.size(); ++i) { delete mContainerInfoCmdVec[i]; } mContainerInfoCmdVec.clear(); } mContainerInfoCmdLock.unlock(); /********** qps limit : only update docker config INT32_FLAG(max_docker_config_update_times) times in 3 minutes * ********/ static int32_t s_lastUpdateTime = 0; static int32_t s_lastUpdateCount = 0; if (!rst) { return rst; } int32_t nowTime = time(NULL); // not in 3 minutes window if (nowTime / 180 != s_lastUpdateTime / 180) { s_lastUpdateCount = 1; s_lastUpdateTime = nowTime; return rst; } // this window's update times < INT32_FLAG(max_docker_config_update_times) // min interval : 10 seconds // For case with frequent container update (eg. K8s short job), adjust this parameter. if (s_lastUpdateCount < INT32_FLAG(max_docker_config_update_times) && nowTime - s_lastUpdateTime >= INT32_FLAG(docker_config_update_interval)) { ++s_lastUpdateCount; s_lastUpdateTime = nowTime; return rst; } // return false if s_lastUpdateCount >= INT32_FLAG(max_docker_config_update_times) and last update time is in same // window return false; /************************************************************************************************************************/ } bool ConfigManager::UpdateContainerStopped(ConfigContainerInfoUpdateCmd* cmd) { PTScopedLock lock(mDockerContainerStoppedCmdLock); mDockerContainerStoppedCmdVec.push_back(cmd); return true; } void ConfigManager::GetContainerStoppedEvents(std::vector<Event*>& eventVec) { std::vector<ConfigContainerInfoUpdateCmd*> cmdVec; { PTScopedLock lock(mDockerContainerStoppedCmdLock); cmdVec.swap(mDockerContainerStoppedCmdVec); } for (auto& cmd : cmdVec) { // find config and container's path, then emit stopped event FileDiscoveryConfig config = FileServer::GetInstance()->GetFileDiscoveryConfig(cmd->mConfigName); if (!config.first) { continue; } ContainerInfo containerInfo; std::string errorMsg; if (!ContainerInfo::ParseByJSONObj(cmd->mJsonParams, containerInfo, errorMsg)) { LOG_ERROR(sLogger, ("invalid container info update param", errorMsg)("action", "ignore current cmd")); continue; } std::vector<ContainerInfo>::iterator iter = config.first->GetContainerInfo()->begin(); std::vector<ContainerInfo>::iterator iend = config.first->GetContainerInfo()->end(); for (; iter != iend; ++iter) { if (iter->mID == containerInfo.mID) { break; } } if (iter == iend) { continue; } Event* pStoppedEvent = new Event(iter->mRealBaseDir, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0); pStoppedEvent->SetConfigName(cmd->mConfigName); pStoppedEvent->SetContainerID(containerInfo.mID); iter->mStopped = true; LOG_DEBUG( sLogger, ("GetContainerStoppedEvent Type", pStoppedEvent->GetType())("Source", pStoppedEvent->GetSource())( "Object", pStoppedEvent->GetObject())("Config", pStoppedEvent->GetConfigName())( "IsDir", pStoppedEvent->IsDir())("IsCreate", pStoppedEvent->IsCreate())("IsModify", pStoppedEvent->IsModify())( "IsDeleted", pStoppedEvent->IsDeleted())("IsMoveFrom", pStoppedEvent->IsMoveFrom())( "IsContainerStopped", pStoppedEvent->IsContainerStopped())("IsMoveTo", pStoppedEvent->IsMoveTo())); eventVec.push_back(pStoppedEvent); } for (auto cmd : cmdVec) { delete cmd; } } void ConfigManager::SaveDockerConfig() { string dockerPathConfigName = AppConfig::GetInstance()->GetDockerFilePathConfig(); Json::Value dockerPathValueRoot; dockerPathValueRoot["version"] = Json::Value(STRING_FLAG(ilogtail_docker_path_version)); Json::Value dockerPathValueDetail; mContainerInfoCmdLock.lock(); const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); for (auto it = nameConfigMap.begin(); it != nameConfigMap.end(); ++it) { if (it->second.first->GetContainerInfo()) { std::vector<ContainerInfo>& containerPathVec = *(it->second.first->GetContainerInfo()); for (size_t i = 0; i < containerPathVec.size(); ++i) { Json::Value dockerPathValue; dockerPathValue["config_name"] = Json::Value(it->first); dockerPathValue["container_id"] = Json::Value(containerPathVec[i].mID); containerPathVec[i].mJson["Path"] = Json::Value(containerPathVec[i].mRealBaseDir); dockerPathValue["params"] = Json::Value(containerPathVec[i].mJson.toStyledString()); dockerPathValueDetail.append(dockerPathValue); } } } mContainerInfoCmdLock.unlock(); dockerPathValueRoot["detail"] = dockerPathValueDetail; string dockerInfo = dockerPathValueRoot.toStyledString(); OverwriteFile(dockerPathConfigName, dockerInfo); LOG_INFO(sLogger, ("dump docker path info", dockerPathConfigName)); } void ConfigManager::LoadDockerConfig() { string dockerPathConfigName = AppConfig::GetInstance()->GetDockerFilePathConfig(); Json::Value dockerPathValueRoot; ParseConfResult rst = ParseConfig(dockerPathConfigName, dockerPathValueRoot); if (rst == CONFIG_INVALID_FORMAT) { LOG_ERROR(sLogger, ("invalid docker config json", rst)("file path", dockerPathConfigName)); } if (rst != CONFIG_OK) { return; } if (!dockerPathValueRoot.isMember("detail")) { return; } Json::Value dockerPathValueDetail = dockerPathValueRoot["detail"]; if (!dockerPathValueDetail.isArray()) { return; } std::vector<ConfigContainerInfoUpdateCmd*> localPaths; for (Json::Value::iterator iter = dockerPathValueDetail.begin(); iter != dockerPathValueDetail.end(); ++iter) { const Json::Value& dockerPathItem = *iter; string configName = dockerPathItem.isMember("config_name") && dockerPathItem["config_name"].isString() ? dockerPathItem["config_name"].asString() : string(); string containerID = dockerPathItem.isMember("container_id") && dockerPathItem["container_id"].isString() ? dockerPathItem["container_id"].asString() : string(); string params = dockerPathItem.isMember("params") && dockerPathItem["params"].isString() ? dockerPathItem["params"].asString() : string(); LOG_INFO(sLogger, ("load docker path, config", configName)("container id", containerID)("params", params)); if (configName.empty() || containerID.empty() || params.empty()) { continue; } // cmd 解析json Json::Value jsonParams; std::string errorMsg; if (params.size() < 5UL || !ParseJsonTable(params, jsonParams, errorMsg)) { LOG_ERROR(sLogger, ("invalid docker container params", params)("errorMsg", errorMsg)); continue; } ConfigContainerInfoUpdateCmd* cmd = new ConfigContainerInfoUpdateCmd(configName, false, jsonParams); localPaths.push_back(cmd); } mContainerInfoCmdLock.lock(); localPaths.insert(localPaths.end(), mContainerInfoCmdVec.begin(), mContainerInfoCmdVec.end()); mContainerInfoCmdVec = localPaths; mContainerInfoCmdLock.unlock(); DoUpdateContainerPaths(); } void ConfigManager::ClearFilePipelineMatchCache() { ScopedSpinLock lock(mCacheFileConfigMapLock); mCacheFileConfigMap.clear(); ScopedSpinLock allLock(mCacheFileAllConfigMapLock); mCacheFileAllConfigMap.clear(); } #ifdef APSARA_UNIT_TEST_MAIN void ConfigManager::CleanEnviroments() { for (std::unordered_map<std::string, EventHandler*>::iterator iter = mDirEventHandlerMap.begin(); iter != mDirEventHandlerMap.end(); ++iter) { if (iter->second != mSharedHandler && iter->second) delete iter->second; } mDirEventHandlerMap.clear(); } #endif } // namespace logtail