core/app_config/AppConfig.h (277 lines of code) (raw):

/* * Copyright 2022 iLogtail Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <functional> #include <map> #include <string> #include <unordered_map> #include <unordered_set> #include <vector> #include "json/json.h" #include "InstanceConfig.h" #include "protobuf/sls/sls_logs.pb.h" namespace logtail { extern const int32_t kDefaultMaxSendBytePerSec; extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; extern const int32_t MIN_SEND_REQUEST_CONCURRENCY; extern const int32_t MAX_SEND_REQUEST_CONCURRENCY; extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD; extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS; extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE; extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE; extern const std::string LOONGCOLLECTOR_ENV_PREFIX; void CreateAgentDir(); std::string GetAgentLogDir(); std::string GetAgentDataDir(); std::string GetAgentConfDir(); std::string GetAgentRunDir(); std::string GetAgentThirdPartyDir(); std::string GetBufferFileDir(); std::string GetHistoryDataPath(); std::string GetAgentGoCheckpointDir(); std::string GetAgentGoLogConfDir(); std::string GetAgentPrometheusAuthorizationPath(); std::string GetAgentConfigFile(); std::string GetAgentAppInfoFile(); std::string GetAdhocCheckpointDirPath(); std::string GetCheckPointFileName(); std::string GetCrashStackFileName(); std::string GetLocalEventDataFileName(); std::string GetInotifyWatcherDirsDumpFileName(); std::string GetAgentLoggersPrefix(); std::string GetAgentLogName(); std::string GetObserverEbpfHostPath(); std::string GetSendBufferFileNamePrefix(); std::string GetLegacyUserLocalConfigFilePath(); std::string GetExactlyOnceCheckpoint(); std::string GetContinuousPipelineConfigDir(); std::string GetPipelineConfigDir(); std::string GetPluginLogName(); std::string GetVersionTag(); std::string GetGoPluginCheckpoint(); std::string GetAgentName(); std::string GetMonitorInfoFileName(); std::string GetSymLinkName(); std::string GetAgentPrefix(); std::string GetFileTagsDir(); template <class T> class DoubleBuffer { public: DoubleBuffer() : currentBuffer(0) {} T& getWriteBuffer() { return buffers[currentBuffer]; } T& getReadBuffer() { return buffers[1 - currentBuffer]; } void swap() { currentBuffer = 1 - currentBuffer; } private: T buffers[2]; int currentBuffer; }; class AppConfig { private: static std::string sLocalConfigDir; void loadAppConfigLogtailMode(const std::string& ilogtailConfigFile); Json::Value mergeAllConfigs(); Json::Value mLocalInstanceConfig; Json::Value mEnvConfig; Json::Value mRemoteInstanceConfig; std::unordered_map<std::string, std::string> mLocalInstanceConfigKeyToConfigName; std::unordered_map<std::string, std::string> mEnvConfigKeyToConfigName; std::unordered_map<std::string, std::string> mRemoteInstanceConfigKeyToConfigName; std::map<std::string, std::function<bool()>*> mCallbacks; DoubleBuffer<std::vector<sls_logs::LogTag>> mFileTags; DoubleBuffer<std::map<std::string, std::string>> mAgentAttrs; Json::Value mFileTagsJson; mutable std::mutex mAppConfigLock; // loongcollector_config.json content for rebuild std::string mIlogtailConfigJson; // syslog // std::string mStreamLogAddress; // uint32_t mStreamLogTcpPort; // uint32_t mStreamLogPoolSizeInMb; // uint32_t mStreamLogRcvLenPerCall; // bool mOpenStreamLog; // performance float mCpuUsageUpLimit; #if defined(__linux__) || defined(__APPLE__) __attribute__((aligned(64))) int64_t mMemUsageUpLimit; #elif defined(_MSC_VER) int64_t mMemUsageUpLimit; #endif int32_t mProcessThreadCount; bool mInputFlowControl; bool mResourceAutoScale; float mMachineCpuUsageThreshold; float mScaledCpuUsageUpLimit; // sender int32_t mMaxHoldedDataSize; int32_t mMaxBufferNum; int32_t mBytePerSec; int32_t mMaxBytePerSec; int32_t mNumOfBufferFile; int32_t mLocalFileSize; int32_t mSendRequestConcurrency; int32_t mSendRequestGlobalConcurrency; std::string mBufferFilePath; // checkpoint std::string mCheckPointFilePath; // local config // std::string mMappingConfigPath; int32_t mMaxMultiConfigSize; bool mAcceptMultiConfigFlag; bool mIgnoreDirInodeChanged; // std::string mUserConfigPath; // std::string mUserLocalConfigPath; // std::string mUserLocalConfigDirPath; // std::string mUserLocalYamlConfigDirPath; // std::string mUserRemoteYamlConfigDirPath; bool mLogParseAlarmFlag; std::string mProcessExecutionDir; std::string mWorkingDir; // std::string mContainerMountConfigPath; std::string mConfigIP; std::string mConfigHostName; // std::string mAlipayZone; int32_t mSystemBootTime = -1; // used to get log config instead of mConfigIp if set, eg: "127.0.0.1.fuse", // std::string mCustomizedConfigIP; // config file path to save docker file cmd info std::string mDockerFilePathConfig; bool mNoInotify; // inotify black list, the path should not end with '/' std::unordered_multiset<std::string> mInotifyBlackList; int32_t mSendDataPort; // port to send data, not for config. bool mShennongSocket; // bind shennong domain socket or not. // tags load from env, this will been add to all loggroup std::vector<sls_logs::LogTag> mEnvTags; bool mPurageContainerMode; // Monitor thread will check last read event time, if exceeds this, // logtail will force quit, 7200s by default. int32_t mForceQuitReadTimeout; std::string mLoongcollectorConfDir; // MUST ends with path separator // For such security case: logtail -> proxy server + firewall (domain rule). // By default, logtail will construct HTTP request URL by concating host IP with // request path, this policy can be disabled by this parameters. bool mEnableHostIPReplace = true; // For cases that proxy or firewall might remove some headers in response from SLS. // By default, SLS response has an x-log-requestid header, this is used to verify // if the response is from SLS. // Introduced because of global acceleration mode, DCDN might return invalid response // in such case, which leads to data lost. bool mEnableResponseVerification = true; bool mEnableCheckpointSyncWrite = false; // SLS server will reject logs with time out of range, which might happen when local // system time is changed. // By enabling this feature, logtail will use the offset between server time and // local time to adjust logs' time automatically. bool mEnableLogTimeAutoAdjust = false; std::set<std::string> mDynamicPlugins; std::vector<std::string> mHostPathBlacklist; std::string mBindInterface; // /** // * @brief Load ConfigServer, DataServer and network interface // * // * @param confJson // */ // virtual void LoadAddrConfig(const Json::Value& confJson) = 0; /** * @brief Auto scale buffer, file and network parameters according to mem limit. * * If buffer_file_num * buffer_file_size > 4GB, then buffer_file_size will be reduced proprotionally. * * File parameters may be adjusted include * polling_max_stat_count, max_watch_dir_count, max_open_files_limit and etc. * The scaling factor is base on mem_limit_num / 2GB. * For example, if max_open_files_limit is set to 100,000 and mem_limit_num is set to 1GB, * then the effective max_open_files_limit value will be 50,000. * * Disable network flow control if max_bytes_per_sec > 30MB/s. */ void CheckAndAdjustParameters(); void MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson); void MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson, std::unordered_map<std::string, std::string>& keyToConfigName, const std::string& configName); /** * @brief Load *.json from config.d dir * * Load according to lexical order. Values in later file will overwrite former. * * @param confJson json value to append to */ void LoadIncludeConfig(Json::Value& confJson); // void LoadSyslogConf(const Json::Value& confJson); void DumpAllFlagsToMap(std::unordered_map<std::string, std::string>& flagMap); void ReadFlagsFromMap(const std::unordered_map<std::string, std::string>& flagMap); /** * @brief Overwrite gflags with the values in json * * @param confJson json value to parse from */ void ParseJsonToFlags(const Json::Value& confJson); /** * @brief Overwrite gflags with the values in environment variales * */ void RecurseParseJsonToFlags(const Json::Value& confJson, std::string prefix); /** * @brief Overwrite gflags with the values in environment variales Recursively * */ void ParseEnvToFlags(); /** * @brief Load resource related configs such as cpu, memory, buffer size, thread number, send concurrency. * * @param confJson json value to load from */ void LoadResourceConf(const Json::Value& confJson); void LoadOtherConf(const Json::Value& confJson); // void LoadGlobalFuseConf(const Json::Value& confJson); void SetIlogtailConfigJson(const std::string& configJson) { std::lock_guard<std::mutex> lock(mAppConfigLock); mIlogtailConfigJson = configJson; } // LoadEnvTags loads env tags from environment. // 1. load STRING_FLAG(default_env_tag_keys) to get all env keys // 2. load each keys to get env key // 3. if no such env key, value will be empty // eg. // env // ALIYUN_LOG_ENV_TAGS=a|b|c // a=1 // b=2 // tags // a : 1 // b : 2 // c : void LoadEnvTags(); // LoadEnvResourceLimit loads resource limit from env config. // Read values will replace corresponding configs in loongcollector_config.json. void LoadEnvResourceLimit(); // logtail is in purage container mode when STRING_FLAG(ilogtail_user_defined_id_env_name) exist and /logtail_host // exist void CheckPurageContainerMode(); bool CheckAndResetProxyEnv(); bool CheckAndResetProxyAddress(const char* envKey, std::string& address); static void InitEnvMapping(const std::string& envStr, std::map<std::string, std::string>& envMapping); static void InitEnvMapping(const std::string& envStr, Json::Value& envJson); static void SetConfigFlag(const std::string& flagName, const std::string& value); public: AppConfig(); ~AppConfig() {} void LoadInstanceConfig(const std::map<std::string, std::shared_ptr<InstanceConfig>>&); static AppConfig* GetInstance() { static AppConfig singleton; return &singleton; } // 初始化配置 void LoadAppConfig(const std::string& ilogtailConfigFile); void LoadLocalInstanceConfig(); // 获取全局参数方法 const Json::Value& GetLocalInstanceConfig() { return mLocalInstanceConfig; }; const Json::Value& GetEnvConfig() { return mEnvConfig; }; const Json::Value& GetRemoteInstanceConfig() { return mRemoteInstanceConfig; }; template <typename T> T MergeConfig(const T& defaultValue, const T& currentValue, const std::string& name, const std::function<bool(const std::string&, const T&)>& validateFn); int32_t MergeInt32(int32_t defaultValue, int32_t currentValue, const std::string& name, const std::function<bool(const std::string&, const int32_t)>& validateFn); int64_t MergeInt64(int64_t defaultValue, int64_t currentValue, const std::string& name, const std::function<bool(const std::string&, const int64_t)>& validateFn); bool MergeBool(bool defaultValue, bool currentValue, const std::string& name, const std::function<bool(const std::string&, const bool)>& validateFn); std::string MergeString(const std::string& defaultValue, const std::string& currentValue, const std::string& name, const std::function<bool(const std::string&, const std::string&)>& validateFn); double MergeDouble(double defaultValue, double currentValue, const std::string& name, const std::function<bool(const std::string&, const double)>& validateFn); // 注册回调 void RegisterCallback(const std::string& key, std::function<bool()>* callback); // 合并配置 std::string Merge(Json::Value& localConf, Json::Value& envConfig, Json::Value& remoteConf, std::string& name, std::function<bool(const std::string&, const std::string&)> validateFn); // 获取特定配置 // CPU限制参数等仅与框架相关的参数,计算逻辑可以放在AppConfig float GetMachineCpuUsageThreshold() const { return mMachineCpuUsageThreshold; } float GetScaledCpuUsageUpLimit() const { return mScaledCpuUsageUpLimit; } float GetCpuUsageUpLimit() const { return mCpuUsageUpLimit; } // 文件标签相关,获取从文件中来的tags std::vector<sls_logs::LogTag>& GetFileTags() { return mFileTags.getReadBuffer(); } // 更新从文件中来的tags void UpdateFileTags(); // Agent属性相关,获取从文件中来的attrs std::map<std::string, std::string>& GetAgentAttrs() { return mAgentAttrs.getReadBuffer(); } // 更新从文件中来的attrs void UpdateAgentAttrs(); // Legacy:获取各种参数 bool NoInotify() const { return mNoInotify; } bool IsInInotifyBlackList(const std::string& path) const; bool IsLogParseAlarmValid() const { return mLogParseAlarmFlag; } // std::string GetDefaultRegion() const; // void SetDefaultRegion(const std::string& region); // uint32_t GetStreamLogTcpPort() const { return mStreamLogTcpPort; } // const std::string& GetStreamLogAddress() const { return mStreamLogAddress; } // uint32_t GetStreamLogPoolSizeInMb() const { return mStreamLogPoolSizeInMb; } // uint32_t GetStreamLogRcvLenPerCall() const { return mStreamLogRcvLenPerCall; } // bool GetOpenStreamLog() const { return mOpenStreamLog; } std::string GetIlogtailConfigJson() { std::lock_guard<std::mutex> lock(mAppConfigLock); return mIlogtailConfigJson; } bool IsAcceptMultiConfig() const { return mAcceptMultiConfigFlag; } void SetAcceptMultiConfig(bool flag) { mAcceptMultiConfigFlag = flag; } int32_t GetMaxMultiConfigSize() const { return mMaxMultiConfigSize; } void SetMaxMultiConfigSize(int32_t maxSize) { mMaxMultiConfigSize = maxSize; } const std::string& GetCheckPointFilePath() const { return mCheckPointFilePath; } bool IsInputFlowControl() const { return mInputFlowControl; } bool IsResourceAutoScale() const { return mResourceAutoScale; } int64_t GetMemUsageUpLimit() const { return mMemUsageUpLimit; } int32_t GetMaxHoldedDataSize() const { return mMaxHoldedDataSize; } uint32_t GetMaxBufferNum() const { return mMaxBufferNum; } int32_t GetMaxBytePerSec() const { return mMaxBytePerSec; } void SetMaxBytePerSec(int32_t maxBytePerSec) { mMaxBytePerSec = maxBytePerSec; } int32_t GetBytePerSec() const { return mBytePerSec; } int32_t GetNumOfBufferFile() const { return mNumOfBufferFile; } int32_t GetLocalFileSize() const { return mLocalFileSize; } const std::string& GetBufferFilePath() const { return mBufferFilePath; } // 单地域并发度 int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; } // 全局并发度 int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; } double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; } int32_t GetProcessThreadCount() const { return mProcessThreadCount; } // const std::string& GetMappingConfigPath() const { return mMappingConfigPath; } // const std::string& GetUserConfigPath() const { return mUserConfigPath; } // const std::string& GetLocalUserConfigPath() const { return mUserLocalConfigPath; } // const std::string& GetLocalUserConfigDirPath() const { return mUserLocalConfigDirPath; } // const std::string& GetLocalUserYamlConfigDirPath() const { return mUserLocalYamlConfigDirPath; } // const std::string& GetRemoteUserYamlConfigDirPath() const { return mUserRemoteYamlConfigDirPath; } bool IgnoreDirInodeChanged() const { return mIgnoreDirInodeChanged; } void SetProcessExecutionDir(const std::string& dir) { mProcessExecutionDir = dir; } const std::string& GetProcessExecutionDir() { return mProcessExecutionDir; } void SetWorkingDir(const std::string& dir) { mWorkingDir = dir; } const std::string& GetWorkingDir() const { return mWorkingDir; } // const std::string& GetContainerMountConfigPath() const { return mContainerMountConfigPath; } const std::string& GetConfigIP() const { return mConfigIP; } // const std::string& GetCustomizedConfigIp() const { return mCustomizedConfigIP; } const std::string& GetConfigHostName() const { return mConfigHostName; } int32_t GetSystemBootTime() const { return mSystemBootTime; } const std::string& GetDockerFilePathConfig() const { return mDockerFilePathConfig; } int32_t GetDataServerPort() const { return mSendDataPort; } bool ShennongSocketEnabled() const { return mShennongSocket; } const std::vector<sls_logs::LogTag>& GetEnvTags() const { return mEnvTags; } bool IsPurageContainerMode() const { return mPurageContainerMode; } int32_t GetForceQuitReadTimeout() const { return mForceQuitReadTimeout; } // const std::string& GetAlipayZone() const { return mAlipayZone; } // If @dirPath is not accessible, GetProcessExecutionDir will be set. void SetLoongcollectorConfDir(const std::string& dirPath); const std::string& GetLoongcollectorConfDir() const { return mLoongcollectorConfDir; } inline bool IsHostIPReplacePolicyEnabled() const { return mEnableHostIPReplace; } inline bool IsResponseVerificationEnabled() const { return mEnableResponseVerification; } // EndpointAddressType GetConfigServerAddressNetType() const { return mConfigServerAddressNetType; } inline bool EnableCheckpointSyncWrite() const { return mEnableCheckpointSyncWrite; } inline bool EnableLogTimeAutoAdjust() const { return mEnableLogTimeAutoAdjust; } inline const std::set<std::string>& GetDynamicPlugins() const { return mDynamicPlugins; } bool IsHostPathMatchBlacklist(const std::string& dirPath) const; const Json::Value& GetConfig() const { return mLocalInstanceConfig; } const std::string& GetBindInterface() const { return mBindInterface; } #ifdef APSARA_UNIT_TEST_MAIN friend class SenderUnittest; friend class ConfigUpdatorUnittest; friend class MultiServerConfigUpdatorUnitest; friend class UtilUnittest; friend class AppConfigUnittest; friend class PipelineUnittest; friend class InputFileUnittest; friend class InputPrometheusUnittest; friend class InputContainerStdioUnittest; friend class BatcherUnittest; friend class EnterpriseSLSClientManagerUnittest; friend class FlusherRunnerUnittest; friend class PipelineUpdateUnittest; friend class ProcessorTagNativeUnittest; friend class EnterpriseConfigProviderUnittest; #endif }; } // namespace logtail