bool CollectionPipeline::Init()

in core/collection_pipeline/CollectionPipeline.cpp [75:359]


bool CollectionPipeline::Init(CollectionConfig&& config) {
    mName = config.mName;
    mConfig = std::move(config.mDetail);
    mSingletonInput = config.mSingletonInput;
    mContext.SetConfigName(mName);
    mContext.SetCreateTime(config.mCreateTime);
    mContext.SetPipeline(*this);
    mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson);
    mContext.SetHasNativeProcessorsFlag(config.mHasNativeProcessor);
    mContext.SetIsFlushingThroughGoPipelineFlag(config.IsFlushingThroughGoPipelineExisted());

    // for special treatment below
    const InputFile* inputFile = nullptr;
    const InputContainerStdio* inputContainerStdio = nullptr;
    bool hasFlusherSLS = false;

    // to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be
    FlusherSLS SLSTmp;
    if (!config.mProject.empty()) {
        SLSTmp.mProject = config.mProject;
        SLSTmp.mLogstore = config.mLogstore;
        SLSTmp.mRegion = config.mRegion;
        mContext.SetSLSInfo(&SLSTmp);
    }

    mPluginID.store(0);
    mInProcessCnt.store(0);
    for (size_t i = 0; i < config.mInputs.size(); ++i) {
        const Json::Value& detail = *config.mInputs[i];
        string pluginType = detail["Type"].asString();
        unique_ptr<InputInstance> input
            = PluginRegistry::GetInstance()->CreateInput(pluginType, GenNextPluginMeta(false));
        if (input) {
            Json::Value optionalGoPipeline;
            if (!input->Init(detail, mContext, i, optionalGoPipeline)) {
                return false;
            }
            mInputs.emplace_back(std::move(input));
            if (!optionalGoPipeline.isNull()) {
                MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput);
            }
            // for special treatment below
            if (pluginType == InputFile::sName) {
                inputFile = static_cast<const InputFile*>(mInputs[0]->GetPlugin());
            } else if (pluginType == InputContainerStdio::sName) {
                inputContainerStdio = static_cast<const InputContainerStdio*>(mInputs[0]->GetPlugin());
            }
        } else {
            AddPluginToGoPipeline(pluginType, detail, "inputs", mGoPipelineWithInput);
        }
    }

    for (size_t i = 0; i < config.mProcessors.size(); ++i) {
        const Json::Value& detail = *config.mProcessors[i];
        string pluginType = detail["Type"].asString();
        unique_ptr<ProcessorInstance> processor
            = PluginRegistry::GetInstance()->CreateProcessor(pluginType, GenNextPluginMeta(false));
        if (processor) {
            if (!processor->Init(detail, mContext)) {
                return false;
            }
            mProcessorLine.emplace_back(std::move(processor));
            // for special treatment of topicformat in apsara mode
            if (i == 0 && pluginType == ProcessorParseApsaraNative::sName) {
                mContext.SetIsFirstProcessorApsaraFlag(true);
            }
        } else {
            if (ShouldAddPluginToGoPipelineWithInput()) {
                AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithInput);
            } else {
                AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithoutInput);
            }
        }
    }

    if (config.mAggregators.empty() && config.IsFlushingThroughGoPipelineExisted()) {
        // an aggregator_default plugin will be add to go pipeline when mAggregators is empty and need to send go data
        // to cpp flusher.
        config.mAggregators.push_back(AggregatorDefaultConfig::Instance().GetJsonConfig());
    }
    for (size_t i = 0; i < config.mAggregators.size(); ++i) {
        const Json::Value& detail = *config.mAggregators[i];
        string pluginType = detail["Type"].asString();
        GenNextPluginMeta(false);
        if (ShouldAddPluginToGoPipelineWithInput()) {
            AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithInput);
        } else {
            AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithoutInput);
        }
    }

    for (size_t i = 0; i < config.mFlushers.size(); ++i) {
        const Json::Value& detail = *config.mFlushers[i];
        string pluginType = detail["Type"].asString();
        unique_ptr<FlusherInstance> flusher
            = PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false));
        if (flusher) {
            Json::Value optionalGoPipeline;
            if (!flusher->Init(detail, mContext, i, optionalGoPipeline)) {
                return false;
            }
            mFlushers.emplace_back(std::move(flusher));
            if (!optionalGoPipeline.isNull() && config.ShouldNativeFlusherConnectedByGoPipeline()) {
                if (ShouldAddPluginToGoPipelineWithInput()) {
                    MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput);
                } else {
                    MergeGoPipeline(optionalGoPipeline, mGoPipelineWithoutInput);
                }
            }
            if (pluginType == FlusherSLS::sName) {
                hasFlusherSLS = true;
                mContext.SetSLSInfo(static_cast<const FlusherSLS*>(mFlushers.back()->GetPlugin()));
            }
        } else {
            if (ShouldAddPluginToGoPipelineWithInput()) {
                AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithInput);
            } else {
                AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithoutInput);
            }
        }
    }

    // route is only enabled in native flushing mode, thus the index in config is the same as that in mFlushers
    if (!mRouter.Init(config.mRouter, mContext)) {
        return false;
    }

    for (size_t i = 0; i < config.mExtensions.size(); ++i) {
        const Json::Value& detail = *config.mExtensions[i];
        string pluginType = detail["Type"].asString();
        GenNextPluginMeta(false);
        if (!mGoPipelineWithInput.isNull()) {
            AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithInput);
        }
        if (!mGoPipelineWithoutInput.isNull()) {
            AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithoutInput);
        }
    }

    // global module must be initialized at last, since native input or flusher plugin may generate global param in Go
    // pipeline, which should be overriden by explicitly provided global module.
    if (config.mGlobal) {
        Json::Value extendedParams;
        if (!mContext.InitGlobalConfig(*config.mGlobal, extendedParams)) {
            return false;
        }
        AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithInput);
        AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithoutInput);
    }
    CopyNativeGlobalParamToGoPipeline(mGoPipelineWithInput);
    CopyNativeGlobalParamToGoPipeline(mGoPipelineWithoutInput);

    if (config.ShouldAddProcessorTagNative()) {
        unique_ptr<ProcessorInstance> processor
            = PluginRegistry::GetInstance()->CreateProcessor(ProcessorTagNative::sName, GenNextPluginMeta(false));
        Json::Value detail;
        if (config.mGlobal) {
            detail = *config.mGlobal;
        }
        if (!processor->Init(detail, mContext)) {
            // should not happen
            return false;
        }
        mPipelineInnerProcessorLine.emplace_back(std::move(processor));
    } else {
        // processor tag requires tags as input, so it is a special processor, cannot add as plugin
        if (!mGoPipelineWithInput.isNull()) {
            CopyTagParamToGoPipeline(mGoPipelineWithInput, config.mGlobal);
        }
        if (!mGoPipelineWithoutInput.isNull()) {
            CopyTagParamToGoPipeline(mGoPipelineWithoutInput, config.mGlobal);
        }
    }

    // mandatory override global.DefaultLogQueueSize in Go pipeline when input_file and Go processing coexist.
    if ((inputFile != nullptr || inputContainerStdio != nullptr) && IsFlushingThroughGoPipeline()) {
        mGoPipelineWithoutInput["global"]["DefaultLogQueueSize"]
            = Json::Value(INT32_FLAG(default_plugin_log_queue_size));
    }

    // special treatment for exactly once
    if (inputFile && inputFile->mExactlyOnceConcurrency > 0) {
        if (mInputs.size() > 1) {
            PARAM_ERROR_RETURN(mContext.GetLogger(),
                               mContext.GetAlarm(),
                               "exactly once enabled when input other than input_file is given",
                               noModule,
                               mName,
                               mContext.GetProjectName(),
                               mContext.GetLogstoreName(),
                               mContext.GetRegion());
        }
        if (mFlushers.size() > 1 || !hasFlusherSLS) {
            PARAM_ERROR_RETURN(mContext.GetLogger(),
                               mContext.GetAlarm(),
                               "exactly once enabled when flusher other than flusher_sls is given",
                               noModule,
                               mName,
                               mContext.GetProjectName(),
                               mContext.GetLogstoreName(),
                               mContext.GetRegion());
        }
        if (IsFlushingThroughGoPipeline()) {
            PARAM_ERROR_RETURN(mContext.GetLogger(),
                               mContext.GetAlarm(),
                               "exactly once enabled when not in native mode",
                               noModule,
                               mName,
                               mContext.GetProjectName(),
                               mContext.GetLogstoreName(),
                               mContext.GetRegion());
        }
    }

#ifndef APSARA_UNIT_TEST_MAIN
    if (!LoadGoPipelines()) {
        return false;
    }
#endif

    // Process queue, not generated when exactly once is enabled
    if (!inputFile || inputFile->mExactlyOnceConcurrency == 0) {
        if (mContext.GetProcessQueueKey() == -1) {
            mContext.SetProcessQueueKey(QueueKeyManager::GetInstance()->GetKey(mName));
        }

        // TODO: for go input, we currently assume bounded process queue
        bool isInputSupportAck = mInputs.empty() ? true : mInputs[0]->SupportAck();
        for (auto& input : mInputs) {
            if (input->SupportAck() != isInputSupportAck) {
                PARAM_ERROR_RETURN(mContext.GetLogger(),
                                   mContext.GetAlarm(),
                                   "not all inputs' ack support are the same",
                                   noModule,
                                   mName,
                                   mContext.GetProjectName(),
                                   mContext.GetLogstoreName(),
                                   mContext.GetRegion());
            }
        }
        if (isInputSupportAck) {
            ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(
                mContext.GetProcessQueueKey(), mContext.GetGlobalConfig().mPriority, mContext);
        } else {
            ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue(
                mContext.GetProcessQueueKey(), mContext.GetGlobalConfig().mPriority, 1024, mContext);
        }


        unordered_set<FeedbackInterface*> feedbackSet;
        for (const auto& input : mInputs) {
            FeedbackInterface* feedback
                = InputFeedbackInterfaceRegistry::GetInstance()->GetFeedbackInterface(input->Name());
            if (feedback != nullptr) {
                feedbackSet.insert(feedback);
            }
        }
        ProcessQueueManager::GetInstance()->SetFeedbackInterface(
            mContext.GetProcessQueueKey(), vector<FeedbackInterface*>(feedbackSet.begin(), feedbackSet.end()));

        vector<BoundedSenderQueueInterface*> senderQueues;
        for (const auto& flusher : mFlushers) {
            senderQueues.push_back(SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey()));
        }
        ProcessQueueManager::GetInstance()->SetDownStreamQueues(mContext.GetProcessQueueKey(), std::move(senderQueues));
    }

    WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
                                                         MetricCategory::METRIC_CATEGORY_PIPELINE,
                                                         {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()},
                                                          {METRIC_LABEL_KEY_PIPELINE_NAME, mName},
                                                          {METRIC_LABEL_KEY_LOGSTORE, mContext.GetLogstoreName()}});
    mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME);
    mProcessorsInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL);
    mProcessorsInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL);
    mProcessorsInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES);
    mProcessorsTotalProcessTimeMs
        = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS);
    mFlushersInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL);
    mFlushersInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL);
    mFlushersInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES);
    mFlushersTotalPackageTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS);

    return true;
}