core/config/CollectionConfig.cpp (645 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "config/CollectionConfig.h" #include <string> #include "boost/regex.hpp" #include "app_config/AppConfig.h" #include "collection_pipeline/plugin/PluginRegistry.h" #include "common/Flags.h" #include "common/ParamExtractor.h" DEFINE_FLAG_BOOL(enable_env_ref_in_config, "enable environment variable reference replacement in configuration", false); using namespace std; namespace logtail { static string UnescapeDollar(string::const_iterator beginIt, string::const_iterator endIt) { string outStr; string::const_iterator lastMatchEnd = beginIt; static boost::regex reg(R"(\$\$|\$})"); boost::regex_iterator<string::const_iterator> it{beginIt, endIt, reg}; boost::regex_iterator<string::const_iterator> end; for (; it != end; ++it) { outStr.append(lastMatchEnd, (*it)[0].first); // original part outStr.append((*it)[0].first + 1, (*it)[0].second); // skip $ char lastMatchEnd = (*it)[0].second; } outStr.append(lastMatchEnd, endIt); // original part return outStr; } static bool ReplaceEnvVarRefInStr(const string& inStr, string& outStr) { string::const_iterator lastMatchEnd = inStr.begin(); static boost::regex reg(R"((?<!\$)\${([\w]+)(:(.*?))?(?<!\$)})"); boost::regex_iterator<string::const_iterator> it{inStr.begin(), inStr.end(), reg}; boost::regex_iterator<string::const_iterator> end; if (it == end) { outStr.append(UnescapeDollar(lastMatchEnd, inStr.end())); // original part return false; } for (; it != end; ++it) { outStr.append(UnescapeDollar(lastMatchEnd, (*it)[0].first)); // original part char* env = getenv((*it)[1].str().c_str()); if (env != NULL) // replace to enviroment variable { outStr.append(env); } else if ((*it).size() == 4) // replace to default value { outStr.append((*it)[3].first, (*it)[3].second); } // else replace to empty string (do nothing) lastMatchEnd = (*it)[0].second; } outStr.append(UnescapeDollar(lastMatchEnd, inStr.end())); // original part return true; } static void ReplaceEnvVarRef(Json::Value& value, bool& res) { if (value.isString()) { string outStr; if (ReplaceEnvVarRefInStr(value.asString(), outStr)) { res = true; } Json::Value tempValue{outStr}; value.swapPayload(tempValue); } else if (value.isArray()) { Json::ValueIterator it = value.begin(); Json::ValueIterator end = value.end(); for (; it != end; ++it) { ReplaceEnvVarRef(*it, res); } } else if (value.isObject()) { Json::ValueIterator it = value.begin(); Json::ValueIterator end = value.end(); for (; it != end; ++it) { ReplaceEnvVarRef(*it, res); } } } bool CollectionConfig::Parse() { if (BOOL_FLAG(enable_env_ref_in_config)) { if (ReplaceEnvVar()) { LOG_INFO(sLogger, ("env vars in config are replaced, config", mDetail->toStyledString())("config", mName)); } } string key, errorMsg; const Json::Value* itr = nullptr; AlarmManager& alarm = *AlarmManager::GetInstance(); // to send alarm and init MetricsRecord, project, logstore and region should be extracted first. key = "flushers"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (itr && itr->isArray()) { for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) { const Json::Value& plugin = (*itr)[i]; if (plugin.isObject()) { key = "Type"; const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size()); if (it && it->isString() && it->asString() == "flusher_sls") { GetMandatoryStringParam(plugin, "Project", mProject, errorMsg); GetMandatoryStringParam(plugin, "Logstore", mLogstore, errorMsg); GetMandatoryStringParam(plugin, "Region", mRegion, errorMsg); } } } } if (!GetOptionalUIntParam(*mDetail, "createTime", mCreateTime, errorMsg)) { PARAM_WARNING_DEFAULT(sLogger, alarm, errorMsg, mCreateTime, noModule, mName, mProject, mLogstore, mRegion); } key = "global"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (itr) { if (!itr->isObject()) { PARAM_ERROR_RETURN( sLogger, alarm, "global module is not of type object", noModule, mName, mProject, mLogstore, mRegion); } mGlobal = itr; } // inputs, processors and flushers module must be parsed first and parsed by order, since aggregators and // extensions module parsing will rely on their results. bool hasFileInput = false; key = "inputs"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (!itr) { PARAM_ERROR_RETURN( sLogger, alarm, "mandatory inputs module is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!itr->isArray()) { PARAM_ERROR_RETURN(sLogger, alarm, "mandatory inputs module is not of type array", noModule, mName, mProject, mLogstore, mRegion); } if (itr->empty()) { PARAM_ERROR_RETURN( sLogger, alarm, "mandatory inputs module has no plugin", noModule, mName, mProject, mLogstore, mRegion); } for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) { const Json::Value& plugin = (*itr)[i]; if (!plugin.isObject()) { PARAM_ERROR_RETURN(sLogger, alarm, "param inputs[" + ToString(i) + "] is not of type object", noModule, mName, mProject, mLogstore, mRegion); } key = "Type"; const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size()); if (it == nullptr) { PARAM_ERROR_RETURN(sLogger, alarm, "param inputs[" + ToString(i) + "].Type is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!it->isString()) { PARAM_ERROR_RETURN(sLogger, alarm, "param inputs[" + ToString(i) + "].Type is not of type string", noModule, mName, mProject, mLogstore, mRegion); } const string pluginType = it->asString(); // when input is singleton, there should only one input to simpify config load transaction if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) { mSingletonInput = pluginType; if (itr->size() > 1) { PARAM_ERROR_RETURN(sLogger, alarm, "more than 1 input plugin is given when global singleton input plugin is used", noModule, mName, mProject, mLogstore, mRegion); } } if (i == 0) { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { mHasGoInput = true; } else if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) { mHasNativeInput = true; } else { PARAM_ERROR_RETURN( sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion); } } else { if (mHasGoInput) { if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) { PARAM_ERROR_RETURN(sLogger, alarm, "native and extended input plugins coexist", noModule, mName, mProject, mLogstore, mRegion); } else if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { PARAM_ERROR_RETURN( sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion); } } else { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { PARAM_ERROR_RETURN(sLogger, alarm, "native and extended input plugins coexist", noModule, mName, mProject, mLogstore, mRegion); } else if (!PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) { PARAM_ERROR_RETURN( sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion); } } } mInputs.push_back(&plugin); #ifndef APSARA_UNIT_TEST_MAIN // TODO: remove these special restrictions if (pluginType == "input_file" || pluginType == "input_container_stdio") { hasFileInput = true; } #else // TODO: remove these special restrictions after all C++ inputs support Go processors if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) { hasFileInput = true; } #endif } // TODO: remove these special restrictions if (hasFileInput && (*mDetail)["inputs"].size() > 1) { PARAM_ERROR_RETURN(sLogger, alarm, "more than 1 input_file or input_container_stdio is given", noModule, mName, mProject, mLogstore, mRegion); } key = "processors"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (itr) { if (!itr->isArray()) { PARAM_ERROR_RETURN(sLogger, alarm, "processors module is not of type array", noModule, mName, mProject, mLogstore, mRegion); } bool isCurrentPluginNative = true; for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) { const Json::Value& plugin = (*itr)[i]; if (!plugin.isObject()) { PARAM_ERROR_RETURN(sLogger, alarm, "param processors[" + ToString(i) + "] is not of type object", noModule, mName, mProject, mLogstore, mRegion); } key = "Type"; const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size()); if (it == nullptr) { PARAM_ERROR_RETURN(sLogger, alarm, "param processors[" + ToString(i) + "].Type is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!it->isString()) { PARAM_ERROR_RETURN(sLogger, alarm, "param processors[" + ToString(i) + "].Type is not of type string", noModule, mName, mProject, mLogstore, mRegion); } const string pluginType = it->asString(); if (mHasGoInput) { if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) { PARAM_ERROR_RETURN(sLogger, alarm, "native processor plugins coexist with extended input plugins", noModule, mName, mProject, mLogstore, mRegion); } else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { mHasGoProcessor = true; } else { PARAM_ERROR_RETURN(sLogger, alarm, "unsupported processor plugin", pluginType, mName, mProject, mLogstore, mRegion); } } else { if (isCurrentPluginNative) { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { // TODO: remove these special restrictions if (!hasFileInput) { PARAM_ERROR_RETURN(sLogger, alarm, "extended processor plugins coexist with native input plugins other " "than input_file or input_container_stdio", noModule, mName, mProject, mLogstore, mRegion); } isCurrentPluginNative = false; mHasGoProcessor = true; } else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) { PARAM_ERROR_RETURN(sLogger, alarm, "unsupported processor plugin", pluginType, mName, mProject, mLogstore, mRegion); } else if (pluginType == "processor_spl") { if (i != 0 || itr->size() != 1) { PARAM_ERROR_RETURN(sLogger, alarm, "native processor plugins coexist with spl processor", noModule, mName, mProject, mLogstore, mRegion); } mHasNativeProcessor = true; } else { mHasNativeProcessor = true; } } else { if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) { PARAM_ERROR_RETURN(sLogger, alarm, "native processor plugin comes after extended processor plugin", pluginType, mName, mProject, mLogstore, mRegion); } else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { mHasGoProcessor = true; } else { PARAM_ERROR_RETURN(sLogger, alarm, "unsupported processor plugin", pluginType, mName, mProject, mLogstore, mRegion); } } } mProcessors.push_back(&plugin); if (i == 0) { if (pluginType == "processor_parse_json_native" || pluginType == "processor_json") { mIsFirstProcessorJson = true; } } } } bool hasFlusherSLS = false; uint32_t nativeFlusherCnt = 0; key = "flushers"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (!itr) { PARAM_ERROR_RETURN( sLogger, alarm, "mandatory flushers module is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!itr->isArray()) { PARAM_ERROR_RETURN(sLogger, alarm, "mandatory flushers module is not of type array", noModule, mName, mProject, mLogstore, mRegion); } if (itr->empty()) { PARAM_ERROR_RETURN( sLogger, alarm, "mandatory flushers module has no plugin", noModule, mName, mProject, mLogstore, mRegion); } for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) { const Json::Value& plugin = (*itr)[i]; if (!plugin.isObject()) { PARAM_ERROR_RETURN(sLogger, alarm, "param flushers[" + ToString(i) + "] is not of type object", noModule, mName, mProject, mLogstore, mRegion); } key = "Type"; const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size()); if (it == nullptr) { PARAM_ERROR_RETURN(sLogger, alarm, "param flushers[" + ToString(i) + "].Type is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!it->isString()) { PARAM_ERROR_RETURN(sLogger, alarm, "param flushers[" + ToString(i) + "].Type is not of type string", noModule, mName, mProject, mLogstore, mRegion); } const string pluginType = it->asString(); if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { // TODO: remove these special restrictions if (mHasNativeInput && !hasFileInput) { PARAM_ERROR_RETURN(sLogger, alarm, "extended flusher plugins coexist with native input plugins other than " "input_file or input_container_stdio", noModule, mName, mProject, mLogstore, mRegion); } mHasGoFlusher = true; } else if (PluginRegistry::GetInstance()->IsValidNativeFlusherPlugin(pluginType)) { mHasNativeFlusher = true; // TODO: remove these special restrictions ++nativeFlusherCnt; if (pluginType == "flusher_sls") { hasFlusherSLS = true; } } else { PARAM_ERROR_RETURN( sLogger, alarm, "unsupported flusher plugin", pluginType, mName, mProject, mLogstore, mRegion); } mFlushers.push_back(&plugin); } // TODO: remove these special restrictions if (mHasGoFlusher && nativeFlusherCnt > 1) { PARAM_ERROR_RETURN(sLogger, alarm, "more than 1 native flusher plugins coexist with extended flusher plugins", noModule, mName, mProject, mLogstore, mRegion); } if (mHasGoFlusher && nativeFlusherCnt == 1 && !hasFlusherSLS) { PARAM_ERROR_RETURN(sLogger, alarm, "native flusher plugins other than flusher_sls coexist with extended flusher plugins", noModule, mName, mProject, mLogstore, mRegion); } key = "Match"; for (size_t i = 0; i < mFlushers.size(); ++i) { auto itr = mFlushers[i]->find(key.c_str(), key.c_str() + key.size()); if (itr) { if (IsFlushingThroughGoPipelineExisted()) { PARAM_ERROR_RETURN(sLogger, alarm, "route found in non-native flushing mode", noModule, mName, mProject, mLogstore, mRegion); } mRouter.emplace_back(i, itr); } else { if (!IsFlushingThroughGoPipelineExisted()) { mRouter.emplace_back(i, nullptr); } } } key = "aggregators"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (itr) { if (!IsFlushingThroughGoPipelineExisted()) { PARAM_ERROR_RETURN(sLogger, alarm, "aggregator plugins exist in native flushing mode", noModule, mName, mProject, mLogstore, mRegion); } if (!itr->isArray()) { PARAM_ERROR_RETURN(sLogger, alarm, "aggregators module is not of type array", noModule, mName, mProject, mLogstore, mRegion); } if (itr->size() != 1) { PARAM_ERROR_RETURN( sLogger, alarm, "more than 1 aggregator is given", noModule, mName, mProject, mLogstore, mRegion); } for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) { const Json::Value& plugin = (*itr)[i]; if (!plugin.isObject()) { PARAM_ERROR_RETURN(sLogger, alarm, "param aggregators[" + ToString(i) + "] is not of type object", noModule, mName, mProject, mLogstore, mRegion); } key = "Type"; const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size()); if (it == nullptr) { PARAM_ERROR_RETURN(sLogger, alarm, "param aggregators[" + ToString(i) + "].Type is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!it->isString()) { PARAM_ERROR_RETURN(sLogger, alarm, "param aggregators[" + ToString(i) + "].Type is not of type string", noModule, mName, mProject, mLogstore, mRegion); } const string pluginType = it->asString(); if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { PARAM_ERROR_RETURN( sLogger, alarm, "unsupported aggregator plugin", pluginType, mName, mProject, mLogstore, mRegion); } mAggregators.push_back(&plugin); } } key = "extensions"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); if (itr) { if (!HasGoPlugin()) { PARAM_ERROR_RETURN(sLogger, alarm, "extension plugins exist when no extended plugin is given", noModule, mName, mProject, mLogstore, mRegion); } if (!itr->isArray()) { PARAM_ERROR_RETURN(sLogger, alarm, "extensions module is not of type array", noModule, mName, mProject, mLogstore, mRegion); } for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) { const Json::Value& plugin = (*itr)[i]; if (!plugin.isObject()) { PARAM_ERROR_RETURN(sLogger, alarm, "param extensions[" + ToString(i) + "] is not of type object", noModule, mName, mProject, mLogstore, mRegion); } key = "Type"; const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size()); if (it == nullptr) { PARAM_ERROR_RETURN(sLogger, alarm, "param extensions[" + ToString(i) + "].Type is missing", noModule, mName, mProject, mLogstore, mRegion); } if (!it->isString()) { PARAM_ERROR_RETURN(sLogger, alarm, "param extensions[" + ToString(i) + "].Type is not of type string", noModule, mName, mProject, mLogstore, mRegion); } const string pluginType = it->asString(); if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { PARAM_ERROR_RETURN( sLogger, alarm, "unsupported extension plugin", pluginType, mName, mProject, mLogstore, mRegion); } mExtensions.push_back(&plugin); } } return true; } bool CollectionConfig::ReplaceEnvVar() { bool res = false; ReplaceEnvVarRef(*mDetail, res); return res; } } // namespace logtail