in core/collection_pipeline/CollectionPipeline.cpp [75:359]
bool CollectionPipeline::Init(CollectionConfig&& config) {
mName = config.mName;
mConfig = std::move(config.mDetail);
mSingletonInput = config.mSingletonInput;
mContext.SetConfigName(mName);
mContext.SetCreateTime(config.mCreateTime);
mContext.SetPipeline(*this);
mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson);
mContext.SetHasNativeProcessorsFlag(config.mHasNativeProcessor);
mContext.SetIsFlushingThroughGoPipelineFlag(config.IsFlushingThroughGoPipelineExisted());
// for special treatment below
const InputFile* inputFile = nullptr;
const InputContainerStdio* inputContainerStdio = nullptr;
bool hasFlusherSLS = false;
// to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be
FlusherSLS SLSTmp;
if (!config.mProject.empty()) {
SLSTmp.mProject = config.mProject;
SLSTmp.mLogstore = config.mLogstore;
SLSTmp.mRegion = config.mRegion;
mContext.SetSLSInfo(&SLSTmp);
}
mPluginID.store(0);
mInProcessCnt.store(0);
for (size_t i = 0; i < config.mInputs.size(); ++i) {
const Json::Value& detail = *config.mInputs[i];
string pluginType = detail["Type"].asString();
unique_ptr<InputInstance> input
= PluginRegistry::GetInstance()->CreateInput(pluginType, GenNextPluginMeta(false));
if (input) {
Json::Value optionalGoPipeline;
if (!input->Init(detail, mContext, i, optionalGoPipeline)) {
return false;
}
mInputs.emplace_back(std::move(input));
if (!optionalGoPipeline.isNull()) {
MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput);
}
// for special treatment below
if (pluginType == InputFile::sName) {
inputFile = static_cast<const InputFile*>(mInputs[0]->GetPlugin());
} else if (pluginType == InputContainerStdio::sName) {
inputContainerStdio = static_cast<const InputContainerStdio*>(mInputs[0]->GetPlugin());
}
} else {
AddPluginToGoPipeline(pluginType, detail, "inputs", mGoPipelineWithInput);
}
}
for (size_t i = 0; i < config.mProcessors.size(); ++i) {
const Json::Value& detail = *config.mProcessors[i];
string pluginType = detail["Type"].asString();
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(pluginType, GenNextPluginMeta(false));
if (processor) {
if (!processor->Init(detail, mContext)) {
return false;
}
mProcessorLine.emplace_back(std::move(processor));
// for special treatment of topicformat in apsara mode
if (i == 0 && pluginType == ProcessorParseApsaraNative::sName) {
mContext.SetIsFirstProcessorApsaraFlag(true);
}
} else {
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithoutInput);
}
}
}
if (config.mAggregators.empty() && config.IsFlushingThroughGoPipelineExisted()) {
// an aggregator_default plugin will be add to go pipeline when mAggregators is empty and need to send go data
// to cpp flusher.
config.mAggregators.push_back(AggregatorDefaultConfig::Instance().GetJsonConfig());
}
for (size_t i = 0; i < config.mAggregators.size(); ++i) {
const Json::Value& detail = *config.mAggregators[i];
string pluginType = detail["Type"].asString();
GenNextPluginMeta(false);
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithoutInput);
}
}
for (size_t i = 0; i < config.mFlushers.size(); ++i) {
const Json::Value& detail = *config.mFlushers[i];
string pluginType = detail["Type"].asString();
unique_ptr<FlusherInstance> flusher
= PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false));
if (flusher) {
Json::Value optionalGoPipeline;
if (!flusher->Init(detail, mContext, i, optionalGoPipeline)) {
return false;
}
mFlushers.emplace_back(std::move(flusher));
if (!optionalGoPipeline.isNull() && config.ShouldNativeFlusherConnectedByGoPipeline()) {
if (ShouldAddPluginToGoPipelineWithInput()) {
MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput);
} else {
MergeGoPipeline(optionalGoPipeline, mGoPipelineWithoutInput);
}
}
if (pluginType == FlusherSLS::sName) {
hasFlusherSLS = true;
mContext.SetSLSInfo(static_cast<const FlusherSLS*>(mFlushers.back()->GetPlugin()));
}
} else {
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithoutInput);
}
}
}
// route is only enabled in native flushing mode, thus the index in config is the same as that in mFlushers
if (!mRouter.Init(config.mRouter, mContext)) {
return false;
}
for (size_t i = 0; i < config.mExtensions.size(); ++i) {
const Json::Value& detail = *config.mExtensions[i];
string pluginType = detail["Type"].asString();
GenNextPluginMeta(false);
if (!mGoPipelineWithInput.isNull()) {
AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithInput);
}
if (!mGoPipelineWithoutInput.isNull()) {
AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithoutInput);
}
}
// global module must be initialized at last, since native input or flusher plugin may generate global param in Go
// pipeline, which should be overriden by explicitly provided global module.
if (config.mGlobal) {
Json::Value extendedParams;
if (!mContext.InitGlobalConfig(*config.mGlobal, extendedParams)) {
return false;
}
AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithInput);
AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithoutInput);
}
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithInput);
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithoutInput);
if (config.ShouldAddProcessorTagNative()) {
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(ProcessorTagNative::sName, GenNextPluginMeta(false));
Json::Value detail;
if (config.mGlobal) {
detail = *config.mGlobal;
}
if (!processor->Init(detail, mContext)) {
// should not happen
return false;
}
mPipelineInnerProcessorLine.emplace_back(std::move(processor));
} else {
// processor tag requires tags as input, so it is a special processor, cannot add as plugin
if (!mGoPipelineWithInput.isNull()) {
CopyTagParamToGoPipeline(mGoPipelineWithInput, config.mGlobal);
}
if (!mGoPipelineWithoutInput.isNull()) {
CopyTagParamToGoPipeline(mGoPipelineWithoutInput, config.mGlobal);
}
}
// mandatory override global.DefaultLogQueueSize in Go pipeline when input_file and Go processing coexist.
if ((inputFile != nullptr || inputContainerStdio != nullptr) && IsFlushingThroughGoPipeline()) {
mGoPipelineWithoutInput["global"]["DefaultLogQueueSize"]
= Json::Value(INT32_FLAG(default_plugin_log_queue_size));
}
// special treatment for exactly once
if (inputFile && inputFile->mExactlyOnceConcurrency > 0) {
if (mInputs.size() > 1) {
PARAM_ERROR_RETURN(mContext.GetLogger(),
mContext.GetAlarm(),
"exactly once enabled when input other than input_file is given",
noModule,
mName,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
mContext.GetRegion());
}
if (mFlushers.size() > 1 || !hasFlusherSLS) {
PARAM_ERROR_RETURN(mContext.GetLogger(),
mContext.GetAlarm(),
"exactly once enabled when flusher other than flusher_sls is given",
noModule,
mName,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
mContext.GetRegion());
}
if (IsFlushingThroughGoPipeline()) {
PARAM_ERROR_RETURN(mContext.GetLogger(),
mContext.GetAlarm(),
"exactly once enabled when not in native mode",
noModule,
mName,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
mContext.GetRegion());
}
}
#ifndef APSARA_UNIT_TEST_MAIN
if (!LoadGoPipelines()) {
return false;
}
#endif
// Process queue, not generated when exactly once is enabled
if (!inputFile || inputFile->mExactlyOnceConcurrency == 0) {
if (mContext.GetProcessQueueKey() == -1) {
mContext.SetProcessQueueKey(QueueKeyManager::GetInstance()->GetKey(mName));
}
// TODO: for go input, we currently assume bounded process queue
bool isInputSupportAck = mInputs.empty() ? true : mInputs[0]->SupportAck();
for (auto& input : mInputs) {
if (input->SupportAck() != isInputSupportAck) {
PARAM_ERROR_RETURN(mContext.GetLogger(),
mContext.GetAlarm(),
"not all inputs' ack support are the same",
noModule,
mName,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
mContext.GetRegion());
}
}
if (isInputSupportAck) {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(
mContext.GetProcessQueueKey(), mContext.GetGlobalConfig().mPriority, mContext);
} else {
ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue(
mContext.GetProcessQueueKey(), mContext.GetGlobalConfig().mPriority, 1024, mContext);
}
unordered_set<FeedbackInterface*> feedbackSet;
for (const auto& input : mInputs) {
FeedbackInterface* feedback
= InputFeedbackInterfaceRegistry::GetInstance()->GetFeedbackInterface(input->Name());
if (feedback != nullptr) {
feedbackSet.insert(feedback);
}
}
ProcessQueueManager::GetInstance()->SetFeedbackInterface(
mContext.GetProcessQueueKey(), vector<FeedbackInterface*>(feedbackSet.begin(), feedbackSet.end()));
vector<BoundedSenderQueueInterface*> senderQueues;
for (const auto& flusher : mFlushers) {
senderQueues.push_back(SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey()));
}
ProcessQueueManager::GetInstance()->SetDownStreamQueues(mContext.GetProcessQueueKey(), std::move(senderQueues));
}
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
MetricCategory::METRIC_CATEGORY_PIPELINE,
{{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, mName},
{METRIC_LABEL_KEY_LOGSTORE, mContext.GetLogstoreName()}});
mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME);
mProcessorsInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL);
mProcessorsInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL);
mProcessorsInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES);
mProcessorsTotalProcessTimeMs
= mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS);
mFlushersInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL);
mFlushersInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL);
mFlushersInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES);
mFlushersTotalPackageTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS);
return true;
}