bool CollectionConfig::Parse()

in core/config/CollectionConfig.cpp [96:685]


bool CollectionConfig::Parse() {
    if (BOOL_FLAG(enable_env_ref_in_config)) {
        if (ReplaceEnvVar()) {
            LOG_INFO(sLogger, ("env vars in config are replaced, config", mDetail->toStyledString())("config", mName));
        }
    }

    string key, errorMsg;
    const Json::Value* itr = nullptr;
    AlarmManager& alarm = *AlarmManager::GetInstance();
    // to send alarm and init MetricsRecord, project, logstore and region should be extracted first.
    key = "flushers";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (itr && itr->isArray()) {
        for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
            const Json::Value& plugin = (*itr)[i];
            if (plugin.isObject()) {
                key = "Type";
                const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
                if (it && it->isString() && it->asString() == "flusher_sls") {
                    GetMandatoryStringParam(plugin, "Project", mProject, errorMsg);
                    GetMandatoryStringParam(plugin, "Logstore", mLogstore, errorMsg);
                    GetMandatoryStringParam(plugin, "Region", mRegion, errorMsg);
                }
            }
        }
    }

    if (!GetOptionalUIntParam(*mDetail, "createTime", mCreateTime, errorMsg)) {
        PARAM_WARNING_DEFAULT(sLogger, alarm, errorMsg, mCreateTime, noModule, mName, mProject, mLogstore, mRegion);
    }

    key = "global";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (itr) {
        if (!itr->isObject()) {
            PARAM_ERROR_RETURN(
                sLogger, alarm, "global module is not of type object", noModule, mName, mProject, mLogstore, mRegion);
        }
        mGlobal = itr;
    }

    // inputs, processors and flushers module must be parsed first and parsed by order, since aggregators and
    // extensions module parsing will rely on their results.
    bool hasFileInput = false;
    key = "inputs";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (!itr) {
        PARAM_ERROR_RETURN(
            sLogger, alarm, "mandatory inputs module is missing", noModule, mName, mProject, mLogstore, mRegion);
    }
    if (!itr->isArray()) {
        PARAM_ERROR_RETURN(sLogger,
                           alarm,
                           "mandatory inputs module is not of type array",
                           noModule,
                           mName,
                           mProject,
                           mLogstore,
                           mRegion);
    }
    if (itr->empty()) {
        PARAM_ERROR_RETURN(
            sLogger, alarm, "mandatory inputs module has no plugin", noModule, mName, mProject, mLogstore, mRegion);
    }
    for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
        const Json::Value& plugin = (*itr)[i];
        if (!plugin.isObject()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "param inputs[" + ToString(i) + "] is not of type object",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        key = "Type";
        const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
        if (it == nullptr) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "param inputs[" + ToString(i) + "].Type is missing",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        if (!it->isString()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "param inputs[" + ToString(i) + "].Type is not of type string",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        const string pluginType = it->asString();
        // when input is singleton, there should only one input to simpify config load transaction
        if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
            mSingletonInput = pluginType;
            if (itr->size() > 1) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "more than 1 input plugin is given when global singleton input plugin is used",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
        }
        if (i == 0) {
            if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                mHasGoInput = true;
            } else if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
                mHasNativeInput = true;
            } else {
                PARAM_ERROR_RETURN(
                    sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
            }
        } else {
            if (mHasGoInput) {
                if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
                    PARAM_ERROR_RETURN(sLogger,
                                       alarm,
                                       "native and extended input plugins coexist",
                                       noModule,
                                       mName,
                                       mProject,
                                       mLogstore,
                                       mRegion);
                } else if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                    PARAM_ERROR_RETURN(
                        sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
                }
            } else {
                if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                    PARAM_ERROR_RETURN(sLogger,
                                       alarm,
                                       "native and extended input plugins coexist",
                                       noModule,
                                       mName,
                                       mProject,
                                       mLogstore,
                                       mRegion);
                } else if (!PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
                    PARAM_ERROR_RETURN(
                        sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
                }
            }
        }
        mInputs.push_back(&plugin);
#ifndef APSARA_UNIT_TEST_MAIN
        // TODO: remove these special restrictions
        if (pluginType == "input_file" || pluginType == "input_container_stdio") {
            hasFileInput = true;
        }
#else
        // TODO: remove these special restrictions after all C++ inputs support Go processors
        if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
            hasFileInput = true;
        }
#endif
    }
    // TODO: remove these special restrictions
    if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
        PARAM_ERROR_RETURN(sLogger,
                           alarm,
                           "more than 1 input_file or input_container_stdio is given",
                           noModule,
                           mName,
                           mProject,
                           mLogstore,
                           mRegion);
    }

    key = "processors";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (itr) {
        if (!itr->isArray()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "processors module is not of type array",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        bool isCurrentPluginNative = true;
        for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
            const Json::Value& plugin = (*itr)[i];
            if (!plugin.isObject()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param processors[" + ToString(i) + "] is not of type object",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            key = "Type";
            const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
            if (it == nullptr) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param processors[" + ToString(i) + "].Type is missing",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            if (!it->isString()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param processors[" + ToString(i) + "].Type is not of type string",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            const string pluginType = it->asString();
            if (mHasGoInput) {
                if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
                    PARAM_ERROR_RETURN(sLogger,
                                       alarm,
                                       "native processor plugins coexist with extended input plugins",
                                       noModule,
                                       mName,
                                       mProject,
                                       mLogstore,
                                       mRegion);
                } else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                    mHasGoProcessor = true;
                } else {
                    PARAM_ERROR_RETURN(sLogger,
                                       alarm,
                                       "unsupported processor plugin",
                                       pluginType,
                                       mName,
                                       mProject,
                                       mLogstore,
                                       mRegion);
                }
            } else {
                if (isCurrentPluginNative) {
                    if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                        // TODO: remove these special restrictions
                        if (!hasFileInput) {
                            PARAM_ERROR_RETURN(sLogger,
                                               alarm,
                                               "extended processor plugins coexist with native input plugins other "
                                               "than input_file or input_container_stdio",
                                               noModule,
                                               mName,
                                               mProject,
                                               mLogstore,
                                               mRegion);
                        }
                        isCurrentPluginNative = false;
                        mHasGoProcessor = true;
                    } else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
                        PARAM_ERROR_RETURN(sLogger,
                                           alarm,
                                           "unsupported processor plugin",
                                           pluginType,
                                           mName,
                                           mProject,
                                           mLogstore,
                                           mRegion);
                    } else if (pluginType == "processor_spl") {
                        if (i != 0 || itr->size() != 1) {
                            PARAM_ERROR_RETURN(sLogger,
                                               alarm,
                                               "native processor plugins coexist with spl processor",
                                               noModule,
                                               mName,
                                               mProject,
                                               mLogstore,
                                               mRegion);
                        }
                        mHasNativeProcessor = true;
                    } else {
                        mHasNativeProcessor = true;
                    }
                } else {
                    if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
                        PARAM_ERROR_RETURN(sLogger,
                                           alarm,
                                           "native processor plugin comes after extended processor plugin",
                                           pluginType,
                                           mName,
                                           mProject,
                                           mLogstore,
                                           mRegion);
                    } else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                        mHasGoProcessor = true;
                    } else {
                        PARAM_ERROR_RETURN(sLogger,
                                           alarm,
                                           "unsupported processor plugin",
                                           pluginType,
                                           mName,
                                           mProject,
                                           mLogstore,
                                           mRegion);
                    }
                }
            }
            mProcessors.push_back(&plugin);
            if (i == 0) {
                if (pluginType == "processor_parse_json_native" || pluginType == "processor_json") {
                    mIsFirstProcessorJson = true;
                }
            }
        }
    }

    bool hasFlusherSLS = false;
    uint32_t nativeFlusherCnt = 0;
    key = "flushers";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (!itr) {
        PARAM_ERROR_RETURN(
            sLogger, alarm, "mandatory flushers module is missing", noModule, mName, mProject, mLogstore, mRegion);
    }
    if (!itr->isArray()) {
        PARAM_ERROR_RETURN(sLogger,
                           alarm,
                           "mandatory flushers module is not of type array",
                           noModule,
                           mName,
                           mProject,
                           mLogstore,
                           mRegion);
    }
    if (itr->empty()) {
        PARAM_ERROR_RETURN(
            sLogger, alarm, "mandatory flushers module has no plugin", noModule, mName, mProject, mLogstore, mRegion);
    }
    for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
        const Json::Value& plugin = (*itr)[i];
        if (!plugin.isObject()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "param flushers[" + ToString(i) + "] is not of type object",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        key = "Type";
        const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
        if (it == nullptr) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "param flushers[" + ToString(i) + "].Type is missing",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        if (!it->isString()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "param flushers[" + ToString(i) + "].Type is not of type string",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        const string pluginType = it->asString();
        if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
            // TODO: remove these special restrictions
            if (mHasNativeInput && !hasFileInput) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "extended flusher plugins coexist with native input plugins other than "
                                   "input_file or input_container_stdio",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            mHasGoFlusher = true;
        } else if (PluginRegistry::GetInstance()->IsValidNativeFlusherPlugin(pluginType)) {
            mHasNativeFlusher = true;
            // TODO: remove these special restrictions
            ++nativeFlusherCnt;
            if (pluginType == "flusher_sls") {
                hasFlusherSLS = true;
            }
        } else {
            PARAM_ERROR_RETURN(
                sLogger, alarm, "unsupported flusher plugin", pluginType, mName, mProject, mLogstore, mRegion);
        }
        mFlushers.push_back(&plugin);
    }
    // TODO: remove these special restrictions
    if (mHasGoFlusher && nativeFlusherCnt > 1) {
        PARAM_ERROR_RETURN(sLogger,
                           alarm,
                           "more than 1 native flusher plugins coexist with extended flusher plugins",
                           noModule,
                           mName,
                           mProject,
                           mLogstore,
                           mRegion);
    }
    if (mHasGoFlusher && nativeFlusherCnt == 1 && !hasFlusherSLS) {
        PARAM_ERROR_RETURN(sLogger,
                           alarm,
                           "native flusher plugins other than flusher_sls coexist with extended flusher plugins",
                           noModule,
                           mName,
                           mProject,
                           mLogstore,
                           mRegion);
    }

    key = "Match";
    for (size_t i = 0; i < mFlushers.size(); ++i) {
        auto itr = mFlushers[i]->find(key.c_str(), key.c_str() + key.size());
        if (itr) {
            if (IsFlushingThroughGoPipelineExisted()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "route found in non-native flushing mode",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            mRouter.emplace_back(i, itr);
        } else {
            if (!IsFlushingThroughGoPipelineExisted()) {
                mRouter.emplace_back(i, nullptr);
            }
        }
    }

    key = "aggregators";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (itr) {
        if (!IsFlushingThroughGoPipelineExisted()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "aggregator plugins exist in native flushing mode",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        if (!itr->isArray()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "aggregators module is not of type array",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        if (itr->size() != 1) {
            PARAM_ERROR_RETURN(
                sLogger, alarm, "more than 1 aggregator is given", noModule, mName, mProject, mLogstore, mRegion);
        }
        for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
            const Json::Value& plugin = (*itr)[i];
            if (!plugin.isObject()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param aggregators[" + ToString(i) + "] is not of type object",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            key = "Type";
            const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
            if (it == nullptr) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param aggregators[" + ToString(i) + "].Type is missing",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            if (!it->isString()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param aggregators[" + ToString(i) + "].Type is not of type string",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            const string pluginType = it->asString();
            if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                PARAM_ERROR_RETURN(
                    sLogger, alarm, "unsupported aggregator plugin", pluginType, mName, mProject, mLogstore, mRegion);
            }
            mAggregators.push_back(&plugin);
        }
    }

    key = "extensions";
    itr = mDetail->find(key.c_str(), key.c_str() + key.size());
    if (itr) {
        if (!HasGoPlugin()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "extension plugins exist when no extended plugin is given",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        if (!itr->isArray()) {
            PARAM_ERROR_RETURN(sLogger,
                               alarm,
                               "extensions module is not of type array",
                               noModule,
                               mName,
                               mProject,
                               mLogstore,
                               mRegion);
        }
        for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
            const Json::Value& plugin = (*itr)[i];
            if (!plugin.isObject()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param extensions[" + ToString(i) + "] is not of type object",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            key = "Type";
            const Json::Value* it = plugin.find(key.c_str(), key.c_str() + key.size());
            if (it == nullptr) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param extensions[" + ToString(i) + "].Type is missing",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            if (!it->isString()) {
                PARAM_ERROR_RETURN(sLogger,
                                   alarm,
                                   "param extensions[" + ToString(i) + "].Type is not of type string",
                                   noModule,
                                   mName,
                                   mProject,
                                   mLogstore,
                                   mRegion);
            }
            const string pluginType = it->asString();
            if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
                PARAM_ERROR_RETURN(
                    sLogger, alarm, "unsupported extension plugin", pluginType, mName, mProject, mLogstore, mRegion);
            }
            mExtensions.push_back(&plugin);
        }
    }

    return true;
}