in core/config/CollectionConfig.cpp [96:685]
bool CollectionConfig::Parse() {
if (BOOL_FLAG(enable_env_ref_in_config)) {
if (ReplaceEnvVar()) {
LOG_INFO(sLogger, ("env vars in config are replaced, config", mDetail->toStyledString())("config", mName));
}
}
string key, errorMsg;
const Json::Value* itr = nullptr;
AlarmManager& alarm = *AlarmManager::GetInstance();
// to send alarm and init MetricsRecord, project, logstore and region should be extracted first.
key = "flushers";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (itr && itr->isArray()) {
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
if (plugin.isObject()) {
key = "Type";
const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
if (it && it->isString() && it->asString() == "flusher_sls") {
GetMandatoryStringParam(plugin, "Project", mProject, errorMsg);
GetMandatoryStringParam(plugin, "Logstore", mLogstore, errorMsg);
GetMandatoryStringParam(plugin, "Region", mRegion, errorMsg);
}
}
}
}
if (!GetOptionalUIntParam(*mDetail, "createTime", mCreateTime, errorMsg)) {
PARAM_WARNING_DEFAULT(sLogger, alarm, errorMsg, mCreateTime, noModule, mName, mProject, mLogstore, mRegion);
}
key = "global";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (itr) {
if (!itr->isObject()) {
PARAM_ERROR_RETURN(
sLogger, alarm, "global module is not of type object", noModule, mName, mProject, mLogstore, mRegion);
}
mGlobal = itr;
}
// inputs, processors and flushers module must be parsed first and parsed by order, since aggregators and
// extensions module parsing will rely on their results.
bool hasFileInput = false;
key = "inputs";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (!itr) {
PARAM_ERROR_RETURN(
sLogger, alarm, "mandatory inputs module is missing", noModule, mName, mProject, mLogstore, mRegion);
}
if (!itr->isArray()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"mandatory inputs module is not of type array",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (itr->empty()) {
PARAM_ERROR_RETURN(
sLogger, alarm, "mandatory inputs module has no plugin", noModule, mName, mProject, mLogstore, mRegion);
}
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
if (!plugin.isObject()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param inputs[" + ToString(i) + "] is not of type object",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "Type";
const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
if (it == nullptr) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param inputs[" + ToString(i) + "].Type is missing",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!it->isString()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param inputs[" + ToString(i) + "].Type is not of type string",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
const string pluginType = it->asString();
// when input is singleton, there should only one input to simpify config load transaction
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
mSingletonInput = pluginType;
if (itr->size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input plugin is given when global singleton input plugin is used",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
}
if (i == 0) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoInput = true;
} else if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
mHasNativeInput = true;
} else {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
} else {
if (mHasGoInput) {
if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native and extended input plugins coexist",
noModule,
mName,
mProject,
mLogstore,
mRegion);
} else if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
} else {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native and extended input plugins coexist",
noModule,
mName,
mProject,
mLogstore,
mRegion);
} else if (!PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
}
}
mInputs.push_back(&plugin);
#ifndef APSARA_UNIT_TEST_MAIN
// TODO: remove these special restrictions
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#else
// TODO: remove these special restrictions after all C++ inputs support Go processors
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input_file or input_container_stdio is given",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "processors";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (itr) {
if (!itr->isArray()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"processors module is not of type array",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
bool isCurrentPluginNative = true;
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
if (!plugin.isObject()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param processors[" + ToString(i) + "] is not of type object",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "Type";
const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
if (it == nullptr) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param processors[" + ToString(i) + "].Type is missing",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!it->isString()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param processors[" + ToString(i) + "].Type is not of type string",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
const string pluginType = it->asString();
if (mHasGoInput) {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with extended input plugins",
noModule,
mName,
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
}
} else {
if (isCurrentPluginNative) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
// TODO: remove these special restrictions
if (!hasFileInput) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"extended processor plugins coexist with native input plugins other "
"than input_file or input_container_stdio",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
isCurrentPluginNative = false;
mHasGoProcessor = true;
} else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (pluginType == "processor_spl") {
if (i != 0 || itr->size() != 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with spl processor",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
mHasNativeProcessor = true;
} else {
mHasNativeProcessor = true;
}
} else {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugin comes after extended processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
}
}
}
mProcessors.push_back(&plugin);
if (i == 0) {
if (pluginType == "processor_parse_json_native" || pluginType == "processor_json") {
mIsFirstProcessorJson = true;
}
}
}
}
bool hasFlusherSLS = false;
uint32_t nativeFlusherCnt = 0;
key = "flushers";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (!itr) {
PARAM_ERROR_RETURN(
sLogger, alarm, "mandatory flushers module is missing", noModule, mName, mProject, mLogstore, mRegion);
}
if (!itr->isArray()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"mandatory flushers module is not of type array",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (itr->empty()) {
PARAM_ERROR_RETURN(
sLogger, alarm, "mandatory flushers module has no plugin", noModule, mName, mProject, mLogstore, mRegion);
}
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
if (!plugin.isObject()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param flushers[" + ToString(i) + "] is not of type object",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "Type";
const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
if (it == nullptr) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param flushers[" + ToString(i) + "].Type is missing",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!it->isString()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param flushers[" + ToString(i) + "].Type is not of type string",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
const string pluginType = it->asString();
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
// TODO: remove these special restrictions
if (mHasNativeInput && !hasFileInput) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"extended flusher plugins coexist with native input plugins other than "
"input_file or input_container_stdio",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
mHasGoFlusher = true;
} else if (PluginRegistry::GetInstance()->IsValidNativeFlusherPlugin(pluginType)) {
mHasNativeFlusher = true;
// TODO: remove these special restrictions
++nativeFlusherCnt;
if (pluginType == "flusher_sls") {
hasFlusherSLS = true;
}
} else {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported flusher plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
mFlushers.push_back(&plugin);
}
// TODO: remove these special restrictions
if (mHasGoFlusher && nativeFlusherCnt > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 native flusher plugins coexist with extended flusher plugins",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (mHasGoFlusher && nativeFlusherCnt == 1 && !hasFlusherSLS) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native flusher plugins other than flusher_sls coexist with extended flusher plugins",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "Match";
for (size_t i = 0; i < mFlushers.size(); ++i) {
auto itr = mFlushers[i]->find(key.c_str(), key.c_str() + key.size());
if (itr) {
if (IsFlushingThroughGoPipelineExisted()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"route found in non-native flushing mode",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
mRouter.emplace_back(i, itr);
} else {
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}
key = "aggregators";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (itr) {
if (!IsFlushingThroughGoPipelineExisted()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"aggregator plugins exist in native flushing mode",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!itr->isArray()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"aggregators module is not of type array",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (itr->size() != 1) {
PARAM_ERROR_RETURN(
sLogger, alarm, "more than 1 aggregator is given", noModule, mName, mProject, mLogstore, mRegion);
}
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
if (!plugin.isObject()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param aggregators[" + ToString(i) + "] is not of type object",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "Type";
const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
if (it == nullptr) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param aggregators[" + ToString(i) + "].Type is missing",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!it->isString()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param aggregators[" + ToString(i) + "].Type is not of type string",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
const string pluginType = it->asString();
if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported aggregator plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
mAggregators.push_back(&plugin);
}
}
key = "extensions";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (itr) {
if (!HasGoPlugin()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"extension plugins exist when no extended plugin is given",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!itr->isArray()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"extensions module is not of type array",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
if (!plugin.isObject()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param extensions[" + ToString(i) + "] is not of type object",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
key = "Type";
const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
if (it == nullptr) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param extensions[" + ToString(i) + "].Type is missing",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
if (!it->isString()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"param extensions[" + ToString(i) + "].Type is not of type string",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
const string pluginType = it->asString();
if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported extension plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
mExtensions.push_back(&plugin);
}
}
return true;
}