core/unittest/pipeline/PipelineUnittest.cpp (2,883 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <future> #include <memory> #include <string> #include <thread> #include "json/json.h" #include "app_config/AppConfig.h" #include "collection_pipeline/CollectionPipeline.h" #include "collection_pipeline/batch/TimeoutFlushManager.h" #include "collection_pipeline/plugin/PluginRegistry.h" #include "collection_pipeline/queue/BoundedProcessQueue.h" #include "collection_pipeline/queue/ProcessQueueManager.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "common/JsonUtil.h" #include "config/CollectionConfig.h" #include "plugin/input/InputFeedbackInterfaceRegistry.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" #include "unittest/Unittest.h" #include "unittest/plugin/PluginMock.h" using namespace std; namespace logtail { class PipelineUnittest : public ::testing::Test { public: void OnSuccessfulInit() const; void OnFailedInit() const; void OnInitVariousTopology() const; void TestProcessQueue() const; void OnInputFileWithJsonMultiline() const; void OnInputFileWithContainerDiscovery() const; void TestProcess() const; void TestSend() const; void TestFlushBatch() const; void TestInProcessingCount() const; void TestWaitAllItemsInProcessFinished() const; void TestMultiFlusherAndRouter() const; protected: static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); LoadPluginMock(); InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces(); AppConfig::GetInstance()->mPurageContainerMode = true; } static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } void TearDown() override { TimeoutFlushManager::GetInstance()->mTimeoutRecords.clear(); QueueKeyManager::GetInstance()->Clear(); ProcessQueueManager::GetInstance()->Clear(); } unique_ptr<ProcessQueueItem> GenerateProcessItem(shared_ptr<CollectionPipeline> pipeline) const { PipelineEventGroup eventGroup(make_shared<SourceBuffer>()); auto item = make_unique<ProcessQueueItem>(std::move(eventGroup), 0); item->mPipeline = pipeline; return item; } private: const string configName = "test_config"; }; void PipelineUnittest::OnSuccessfulInit() const { unique_ptr<Json::Value> configJson; Json::Value goPipelineWithInput, goPipelineWithoutInput; string configStr, goPipelineWithInputStr, goPipelineWithoutInputStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; // with sls flusher configStr = R"( { "createTime": 123456789, "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(configName, pipeline->Name()); APSARA_TEST_EQUAL(configName, pipeline->GetContext().GetConfigName()); APSARA_TEST_EQUAL(123456789U, pipeline->GetContext().GetCreateTime()); APSARA_TEST_EQUAL("test_project", pipeline->GetContext().GetProjectName()); APSARA_TEST_EQUAL("test_logstore", pipeline->GetContext().GetLogstoreName()); APSARA_TEST_EQUAL("test_region", pipeline->GetContext().GetRegion()); APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"), pipeline->GetContext().GetLogstoreKey()); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); APSARA_TEST_EQUAL(3U, pipeline->mMetricsRecordRef->GetLabels()->size()); APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PIPELINE_NAME, configName)); APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PROJECT, "test_project")); // without sls flusher configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(configName, pipeline->Name()); APSARA_TEST_EQUAL(configName, pipeline->GetContext().GetConfigName()); APSARA_TEST_EQUAL(0U, pipeline->GetContext().GetCreateTime()); APSARA_TEST_EQUAL("", pipeline->GetContext().GetProjectName()); APSARA_TEST_EQUAL("", pipeline->GetContext().GetLogstoreName()); APSARA_TEST_EQUAL("", pipeline->GetContext().GetRegion()); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); #ifndef __ENTERPRISE__ APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-"), pipeline->GetContext().GetLogstoreKey()); #endif // extensions and extended global param configStr = R"( { "global": { "DefaultLogGroupQueueSize": 3, "DefaultLogQueueSize": 5 }, "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ], "EnableContainerDiscovery": true, "CollectingContainersMeta": true } ], "flushers": [ { "Type": "flusher_http" } ], "extensions": [ { "Type": "ext_basicauth" } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 5, "DefaultLogGroupQueueSize": 3 }, "inputs": [ { "type": "metric_container_info/2", "detail": { "CollectingContainersMeta": true, "LogPath": "/home", "MaxDepth": 0, "FilePattern": "test.log" } } ], "extensions": [ { "type": "ext_basicauth/6", "detail": {} } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10, "DefaultLogGroupQueueSize": 3 }, "aggregators": [ { "type": "aggregator_default/4", "detail": {} } ], "flushers": [ { "type": "flusher_http/5", "detail": {} } ], "extensions": [ { "type": "ext_basicauth/6", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); goPipelineWithInput.clear(); goPipelineWithoutInput.clear(); // router configStr = R"( { "createTime": 123456789, "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore_1", "Region": "test_region", "Endpoint": "test_endpoint" }, { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore_2", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "event_type", "Value": "log" } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mRouter.mConditions.size()); APSARA_TEST_EQUAL(1U, pipeline->mRouter.mAlwaysMatchedFlusherIdx.size()); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); } void PipelineUnittest::OnFailedInit() const { unique_ptr<Json::Value> configJson; string configStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; // invalid input configStr = R"( { "inputs": [ { "Type": "input_file" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_FALSE(pipeline->Init(std::move(*config))); // invalid processor configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_FALSE(pipeline->Init(std::move(*config))); // invalid flusher configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_FALSE(pipeline->Init(std::move(*config))); // invalid router configStr = R"( { "createTime": 123456789, "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore_1", "Region": "test_region", "Endpoint": "test_endpoint", "Match": "unknown" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_FALSE(pipeline->Init(std::move(*config))); // invalid inputs ack support configStr = R"( { "createTime": 123456789, "inputs": [ { "Type": "input_mock" }, { "Type": "input_mock", "SupportAck": false } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore_1", "Region": "test_region", "Endpoint": "test_endpoint", "Match": "unknown" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_FALSE(pipeline->Init(std::move(*config))); } void PipelineUnittest::OnInitVariousTopology() const { unique_ptr<Json::Value> configJson; Json::Value goPipelineWithInput, goPipelineWithoutInput; string configStr, goPipelineWithInputStr, goPipelineWithoutInputStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; // topology 1: native -> native -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); // topology 2: extended -> native -> native configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 3: (native, extended) -> native -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 4: native -> extended -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize": 10, "EnableProcessorTag": true }, "processors": [ { "type": "processor_regex/3", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/4", "detail": {} } ], "flushers": [ { "type": "flusher_sls/5", "detail": { "EnableShardHash": false } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 5: extended -> extended -> native configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "EnableProcessorTag": true }, "inputs": [ { "type": "service_docker_stdout/1", "detail": {} } ], "processors": [ { "type": "processor_regex/2", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/3", "detail": {} } ], "flushers": [ { "type": "flusher_sls/4", "detail": { "EnableShardHash": false } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(0U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithInput.clear(); // topology 6: (native, extended) -> extended -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 7: native -> (native -> extended) -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "processors": [ { "type": "processor_regex/4", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/5", "detail": {} } ], "flushers": [ { "type": "flusher_sls/6", "detail": { "EnableShardHash": false } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 8: extended -> (native -> extended) -> native configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 9: (native, extended) -> (native -> extended) -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 10: native -> none -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); // topology 11: extended -> none -> native (future changes maybe applied) configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "EnableProcessorTag": true }, "inputs": [ { "type": "service_docker_stdout/1", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/2", "detail": {} } ], "flushers": [ { "type": "flusher_sls/3", "detail": { "EnableShardHash": false } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(0U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithInput.clear(); // topology 12: (native, extended) -> none -> native configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 13: native -> native -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "aggregators": [ { "type": "aggregator_default/4", "detail": {} } ], "flushers": [ { "type": "flusher_http/5", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 14: extended -> native -> extended configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 15: (native, extended) -> native -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 16: native -> extended -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10, "EnableProcessorTag": true }, "processors": [ { "type": "processor_regex/3", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/4", "detail": {} } ], "flushers": [ { "type": "flusher_http/5", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 17: extended -> extended -> extended configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "EnableProcessorTag": true }, "inputs": [ { "type": "service_docker_stdout/1", "detail": {} } ], "processors": [ { "type": "processor_regex/2", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/3", "detail": {} } ], "flushers": [ { "type": "flusher_http/4", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(0U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size()); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithInput.clear(); // topology 18: (native, extended) -> extended -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 19: native -> (native -> extended) -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "processors": [ { "type": "processor_regex/4", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/5", "detail": {} } ], "flushers": [ { "type": "flusher_http/6", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 20: extended -> (native -> extended) -> extended configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 21: (native, extended) -> (native -> extended) -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 22: native -> none -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "aggregators": [ { "type": "aggregator_default/3", "detail": {} } ], "flushers": [ { "type": "flusher_http/4", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 23: extended -> none -> extended configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "EnableProcessorTag": true }, "inputs": [ { "type": "service_docker_stdout/1", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/2", "detail": {} } ], "flushers": [ { "type": "flusher_http/3", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(0U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size()); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithInput.clear(); // topology 24: (native, extended) -> none -> extended configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "flushers": [ { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 25: native -> native -> (native, extended) (future changes maybe applied) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "aggregators": [ { "type": "aggregator_default/4", "detail": {} } ], "flushers": [ { "type": "flusher_sls/5", "detail": { "EnableShardHash": false } }, { "type": "flusher_http/6", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 26: extended -> native -> (native, extended) configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 27: (native, extended) -> native -> (native, extended) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 28: native -> extended -> (native, extended) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10, "EnableProcessorTag": true }, "processors": [ { "type": "processor_regex/3", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/4", "detail": {} } ], "flushers": [ { "type": "flusher_sls/5", "detail": { "EnableShardHash": false } }, { "type": "flusher_http/6", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 29: extended -> extended -> (native, extended) configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "EnableProcessorTag": true }, "inputs": [ { "type": "service_docker_stdout/1", "detail": {} } ], "processors": [ { "type": "processor_regex/2", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/3", "detail": {} } ], "flushers": [ { "type": "flusher_sls/4", "detail": { "EnableShardHash": false } }, { "type": "flusher_http/5", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(0U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithInput.clear(); // topology 30: (native, extended) -> extended -> (native, extended) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 31: native -> (native -> extended) -> (native, extended) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "processors": [ { "type": "processor_regex/4", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/5", "detail": {} } ], "flushers": [ { "type": "flusher_sls/6", "detail": { "EnableShardHash": false } }, { "type": "flusher_http/7", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 32: extended -> (native -> extended) -> (native, extended) configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 33: (native, extended) -> (native -> extended) -> (native, extended) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "processors": [ { "Type": "processor_parse_regex_native", "SourceKey": "content", "Regex": ".*", "Keys": ["key"] }, { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); // topology 34: native -> none -> (native, extended) (future changes maybe applied) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "aggregators": [ { "type": "aggregator_default/3", "detail": {} } ], "flushers": [ { "type": "flusher_sls/4", "detail": { "EnableShardHash": false } }, { "type": "flusher_http/5", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(1U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithoutInput.clear(); // topology 35: extended -> none -> (native, extended) configStr = R"( { "inputs": [ { "Type": "service_docker_stdout" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "EnableProcessorTag": true }, "inputs": [ { "type": "service_docker_stdout/1", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/2", "detail": {} } ], "flushers": [ { "type": "flusher_sls/3", "detail": { "EnableShardHash": false } }, { "type": "flusher_http/4", "detail": {} } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(0U, pipeline->mInputs.size()); APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size()); APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size()); APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size()); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo()); goPipelineWithInput.clear(); // topology 36: (native, extended) -> none -> (native, extended) configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] }, { "Type": "service_docker_stdout" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false }, { "Type": "flusher_http" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_FALSE(config->Parse()); } void PipelineUnittest::TestProcessQueue() const { unique_ptr<Json::Value> configJson; string configStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; QueueKey key; ProcessQueueManager::ProcessQueueIterator que; // new pipeline configStr = R"( { "global": { "Priority": 0 }, "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); key = QueueKeyManager::GetInstance()->GetKey(configName); que = ProcessQueueManager::GetInstance()->mQueues[key].first; APSARA_TEST_EQUAL(ProcessQueueManager::QueueType::BOUNDED, ProcessQueueManager::GetInstance()->mQueues[key].second); // queue level APSARA_TEST_EQUAL(configName, (*que)->GetConfigName()); APSARA_TEST_EQUAL(key, (*que)->GetKey()); APSARA_TEST_EQUAL(0U, (*que)->GetPriority()); APSARA_TEST_EQUAL(1U, static_cast<BoundedProcessQueue*>(que->get())->mUpStreamFeedbacks.size()); APSARA_TEST_EQUAL(InputFeedbackInterfaceRegistry::GetInstance()->GetFeedbackInterface("input_file"), static_cast<BoundedProcessQueue*>(que->get())->mUpStreamFeedbacks[0]); APSARA_TEST_EQUAL(1U, (*que)->mDownStreamQueues.size()); // pipeline level APSARA_TEST_EQUAL(key, pipeline->GetContext().GetProcessQueueKey()); // manager level APSARA_TEST_EQUAL(1U, ProcessQueueManager::GetInstance()->mQueues.size()); APSARA_TEST_EQUAL(1U, ProcessQueueManager::GetInstance()->mPriorityQueue[0].size()); APSARA_TEST_TRUE(ProcessQueueManager::GetInstance()->mPriorityQueue[0].begin() == ProcessQueueManager::GetInstance()->mQueues[key].first); // update pipeline with different priority configStr = R"( { "inputs": [ { "Type": "input_mock" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); key = QueueKeyManager::GetInstance()->GetKey(configName); que = ProcessQueueManager::GetInstance()->mQueues[key].first; APSARA_TEST_EQUAL(ProcessQueueManager::QueueType::BOUNDED, ProcessQueueManager::GetInstance()->mQueues[key].second); // queue level APSARA_TEST_EQUAL(configName, (*que)->GetConfigName()); APSARA_TEST_EQUAL(key, (*que)->GetKey()); APSARA_TEST_EQUAL(1U, (*que)->GetPriority()); APSARA_TEST_EQUAL(1U, (*que)->mDownStreamQueues.size()); // pipeline level APSARA_TEST_EQUAL(key, pipeline->GetContext().GetProcessQueueKey()); // manager level APSARA_TEST_EQUAL(1U, ProcessQueueManager::GetInstance()->mQueues.size()); APSARA_TEST_EQUAL(1U, ProcessQueueManager::GetInstance()->mPriorityQueue[1].size()); APSARA_TEST_TRUE(ProcessQueueManager::GetInstance()->mPriorityQueue[1].begin() == ProcessQueueManager::GetInstance()->mQueues[key].first); // update pipeline with different type configStr = R"( { "inputs": [ { "Type": "input_mock", "SupportAck": false }, { "Type": "input_mock", "SupportAck": false } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint" } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); key = QueueKeyManager::GetInstance()->GetKey(configName); que = ProcessQueueManager::GetInstance()->mQueues[key].first; APSARA_TEST_EQUAL(ProcessQueueManager::QueueType::CIRCULAR, ProcessQueueManager::GetInstance()->mQueues[key].second); // queue level APSARA_TEST_EQUAL(configName, (*que)->GetConfigName()); APSARA_TEST_EQUAL(key, (*que)->GetKey()); APSARA_TEST_EQUAL(1U, (*que)->GetPriority()); APSARA_TEST_EQUAL(1U, (*que)->mDownStreamQueues.size()); // pipeline level APSARA_TEST_EQUAL(key, pipeline->GetContext().GetProcessQueueKey()); // manager level APSARA_TEST_EQUAL(1U, ProcessQueueManager::GetInstance()->mQueues.size()); APSARA_TEST_EQUAL(1U, ProcessQueueManager::GetInstance()->mPriorityQueue[1].size()); APSARA_TEST_TRUE(ProcessQueueManager::GetInstance()->mPriorityQueue[1].begin() == ProcessQueueManager::GetInstance()->mQueues[key].first); // delete pipeline pipeline->RemoveProcessQueue(); pipeline.reset(); APSARA_TEST_EQUAL(0U, ProcessQueueManager::GetInstance()->mQueues.size()); APSARA_TEST_EQUAL("", QueueKeyManager::GetInstance()->GetName(key)); } void PipelineUnittest::OnInputFileWithJsonMultiline() const { unique_ptr<Json::Value> configJson; string configStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; // first processor is native json parser configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_parse_json_native", "SourceKey": "content" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_TRUE(pipeline->GetContext().RequiringJsonReader()); APSARA_TEST_EQUAL(ProcessorSplitLogStringNative::sName, pipeline->mInputs[0]->GetInnerProcessors()[0]->Name()); // first processor is extended json parser configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "processors": [ { "Type": "processor_json" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_TRUE(pipeline->GetContext().RequiringJsonReader()); APSARA_TEST_EQUAL(ProcessorSplitLogStringNative::sName, pipeline->mInputs[0]->GetInnerProcessors()[0]->Name()); } void PipelineUnittest::OnInputFileWithContainerDiscovery() const { unique_ptr<Json::Value> configJson; Json::Value goPipelineWithInput, goPipelineWithoutInput; string configStr, goPipelineWithoutInputStr, goPipelineWithInputStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; // native processing configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ], "EnableContainerDiscovery": true, "CollectingContainersMeta": true } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10 }, "inputs": [ { "type": "metric_container_info/2", "detail": { "CollectingContainersMeta": true, "LogPath": "/home", "MaxDepth": 0, "FilePattern": "test.log" } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull()); goPipelineWithInput.clear(); // mixed processing configStr = R"( { "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ], "EnableContainerDiscovery": true, "CollectingContainersMeta": true } ], "processors": [ { "Type": "processor_regex" } ], "flushers": [ { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_endpoint", "EnableShardHash": false } ] } )"; goPipelineWithInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10, "EnableProcessorTag": true }, "inputs": [ { "type": "metric_container_info/2", "detail": { "CollectingContainersMeta": true, "LogPath": "/home", "MaxDepth": 0, "FilePattern": "test.log" } } ] } )"; goPipelineWithoutInputStr = R"( { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10, "EnableProcessorTag": true }, "processors": [ { "type": "processor_regex/4", "detail": {} } ], "aggregators": [ { "type": "aggregator_default/5", "detail": {} } ], "flushers": [ { "type": "flusher_sls/6", "detail": { "EnableShardHash": false } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString()); APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString()); goPipelineWithInput.clear(); goPipelineWithoutInput.clear(); } void PipelineUnittest::TestProcess() const { CollectionPipeline pipeline; pipeline.mPluginID.store(0); CollectionPipelineContext ctx; ctx.SetPipeline(pipeline); Json::Value tmp; auto input = PluginRegistry::GetInstance()->CreateInput(InputMock::sName, pipeline.GenNextPluginMeta(false)); input->Init(Json::Value(), ctx, 0, tmp); pipeline.mInputs.emplace_back(std::move(input)); auto processor = PluginRegistry::GetInstance()->CreateProcessor(ProcessorMock::sName, pipeline.GenNextPluginMeta(false)); processor->Init(Json::Value(), ctx); pipeline.mProcessorLine.emplace_back(std::move(processor)); WriteMetrics::GetInstance()->PrepareMetricsRecordRef( pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {}); pipeline.mProcessorsInEventsTotal = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL); pipeline.mProcessorsInGroupsTotal = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL); pipeline.mProcessorsInSizeBytes = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES); pipeline.mProcessorsTotalProcessTimeMs = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS); vector<PipelineEventGroup> groups; groups.emplace_back(make_shared<SourceBuffer>()); groups.back().AddLogEvent(); auto size = groups.back().DataSize(); pipeline.Process(groups, 0); APSARA_TEST_EQUAL( 1U, static_cast<const ProcessorInnerMock*>(pipeline.mInputs[0]->GetInnerProcessors()[0]->mPlugin.get())->mCnt); APSARA_TEST_EQUAL(1U, static_cast<const ProcessorMock*>(pipeline.mProcessorLine[0]->mPlugin.get())->mCnt); APSARA_TEST_EQUAL(1U, pipeline.mProcessorsInEventsTotal->GetValue()); APSARA_TEST_EQUAL(1U, pipeline.mProcessorsInGroupsTotal->GetValue()); APSARA_TEST_EQUAL(size, pipeline.mProcessorsInSizeBytes->GetValue()); } void PipelineUnittest::TestSend() const { { // no route CollectionPipeline pipeline; pipeline.mPluginID.store(0); CollectionPipelineContext ctx; ctx.SetPipeline(pipeline); Json::Value tmp; { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } vector<pair<size_t, const Json::Value*>> configs; configs.emplace_back(0, nullptr); configs.emplace_back(1, nullptr); pipeline.mRouter.Init(configs, ctx); WriteMetrics::GetInstance()->PrepareMetricsRecordRef( pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {}); pipeline.mFlushersInGroupsTotal = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); pipeline.mFlushersInEventsTotal = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); pipeline.mFlushersInSizeBytes = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); pipeline.mFlushersTotalPackageTimeMs = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); { // all valid vector<PipelineEventGroup> group; group.emplace_back(make_shared<SourceBuffer>()); group.back().AddLogEvent(); APSARA_TEST_TRUE(pipeline.Send(std::move(group))); } { // some flusher not valid const_cast<FlusherMock*>(static_cast<const FlusherMock*>(pipeline.mFlushers[0]->GetPlugin()))->mIsValid = false; vector<PipelineEventGroup> group; group.emplace_back(make_shared<SourceBuffer>()); group.back().AddLogEvent(); APSARA_TEST_FALSE(pipeline.Send(std::move(group))); const_cast<FlusherMock*>(static_cast<const FlusherMock*>(pipeline.mFlushers[0]->GetPlugin()))->mIsValid = true; } } { // with route CollectionPipeline pipeline; pipeline.mPluginID.store(0); CollectionPipelineContext ctx; ctx.SetPipeline(pipeline); Json::Value tmp; { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } Json::Value configJson; string errorMsg; string configStr = R"( [ { "Type": "event_type", "Value": "log" } ] )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); vector<pair<size_t, const Json::Value*>> configs; for (Json::Value::ArrayIndex i = 0; i < configJson.size(); ++i) { configs.emplace_back(i, &configJson[i]); } configs.emplace_back(configJson.size(), nullptr); pipeline.mRouter.Init(configs, ctx); WriteMetrics::GetInstance()->PrepareMetricsRecordRef( pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {}); pipeline.mFlushersInGroupsTotal = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); pipeline.mFlushersInEventsTotal = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); pipeline.mFlushersInSizeBytes = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); pipeline.mFlushersTotalPackageTimeMs = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); { vector<PipelineEventGroup> group; group.emplace_back(make_shared<SourceBuffer>()); group[0].AddLogEvent(); APSARA_TEST_TRUE(pipeline.Send(std::move(group))); } { const_cast<FlusherMock*>(static_cast<const FlusherMock*>(pipeline.mFlushers[0]->GetPlugin()))->mIsValid = false; vector<PipelineEventGroup> group; group.emplace_back(make_shared<SourceBuffer>()); group[0].AddMetricEvent(); APSARA_TEST_TRUE(pipeline.Send(std::move(group))); const_cast<FlusherMock*>(static_cast<const FlusherMock*>(pipeline.mFlushers[0]->GetPlugin()))->mIsValid = true; } } } void PipelineUnittest::TestFlushBatch() const { CollectionPipeline pipeline; pipeline.mName = configName; pipeline.mPluginID.store(0); CollectionPipelineContext ctx; ctx.SetPipeline(pipeline); Json::Value tmp; { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { // all successful TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 0, 1, 3, nullptr); TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 1, 1, 3, nullptr); APSARA_TEST_TRUE(pipeline.FlushBatch()); APSARA_TEST_EQUAL(0U, TimeoutFlushManager::GetInstance()->mTimeoutRecords.size()); APSARA_TEST_EQUAL(2U, TimeoutFlushManager::GetInstance()->mDeletedFlushers.size()); TimeoutFlushManager::GetInstance()->FlushTimeoutBatch(); } { // some failed const_cast<FlusherMock*>(static_cast<const FlusherMock*>(pipeline.mFlushers[0]->GetPlugin()))->mIsValid = false; TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 0, 1, 3, nullptr); TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 1, 1, 3, nullptr); APSARA_TEST_FALSE(pipeline.FlushBatch()); APSARA_TEST_EQUAL(0U, TimeoutFlushManager::GetInstance()->mTimeoutRecords.size()); APSARA_TEST_EQUAL(2U, TimeoutFlushManager::GetInstance()->mDeletedFlushers.size()); TimeoutFlushManager::GetInstance()->FlushTimeoutBatch(); } } void PipelineUnittest::TestInProcessingCount() const { auto pipeline = make_shared<CollectionPipeline>(); pipeline->mPluginID.store(0); pipeline->mInProcessCnt.store(0); CollectionPipelineContext ctx; unique_ptr<BoundedProcessQueue> processQueue; processQueue.reset(new BoundedProcessQueue(2, 2, 3, 0, 1, ctx)); vector<PipelineEventGroup> group; group.emplace_back(make_shared<SourceBuffer>()); auto pipeline2 = make_shared<CollectionPipeline>(); CollectionPipelineManager::GetInstance()->mPipelineNameEntityMap[""] = pipeline2; processQueue->EnablePop(); processQueue->Push(GenerateProcessItem(pipeline)); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); APSARA_TEST_EQUAL(0, pipeline2->mInProcessCnt.load()); unique_ptr<ProcessQueueItem> item; APSARA_TEST_TRUE(processQueue->Pop(item)); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); APSARA_TEST_EQUAL(1, pipeline2->mInProcessCnt.load()); pipeline2->SubInProcessCnt(); APSARA_TEST_EQUAL(0, pipeline2->mInProcessCnt.load()); } void PipelineUnittest::TestWaitAllItemsInProcessFinished() const { auto pipeline = make_shared<CollectionPipeline>(); pipeline->mPluginID.store(0); pipeline->mInProcessCnt.store(0); pipeline->mInProcessCnt.store(1); std::future<void> future = std::async(std::launch::async, &CollectionPipeline::WaitAllItemsInProcessFinished, pipeline.get()); // block APSARA_TEST_NOT_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0))); pipeline->mInProcessCnt.store(0); // recover usleep(3000); APSARA_TEST_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0))); } void PipelineUnittest::TestMultiFlusherAndRouter() const { unique_ptr<Json::Value> configJson; string configStr, errorMsg; unique_ptr<CollectionConfig> config; unique_ptr<CollectionPipeline> pipeline; // new pipeline configStr = R"( { "global": { "ProcessPriority": 1 }, "inputs": [ { "Type": "input_file", "FilePaths": [ "/home/test.log" ] } ], "flushers": [ { "Type": "flusher_sls", "TelemetryType": "arms_traces", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "tag", "Key": "data_type", "Value": "trace" } }, { "Type": "flusher_sls", "TelemetryType": "arms_metrics", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "tag", "Key": "data_type", "Value": "metric" } }, { "Type": "flusher_sls", "TelemetryType": "arms_agentinfo", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "tag", "Key": "data_type", "Value": "agent_info" } } ] } )"; configJson.reset(new Json::Value()); APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg)); config.reset(new CollectionConfig(configName, std::move(configJson))); APSARA_TEST_TRUE(config->Parse()); pipeline.reset(new CollectionPipeline()); APSARA_TEST_TRUE(pipeline->Init(std::move(*config))); } UNIT_TEST_CASE(PipelineUnittest, OnSuccessfulInit) UNIT_TEST_CASE(PipelineUnittest, OnFailedInit) UNIT_TEST_CASE(PipelineUnittest, TestProcessQueue) UNIT_TEST_CASE(PipelineUnittest, OnInitVariousTopology) UNIT_TEST_CASE(PipelineUnittest, OnInputFileWithJsonMultiline) UNIT_TEST_CASE(PipelineUnittest, OnInputFileWithContainerDiscovery) UNIT_TEST_CASE(PipelineUnittest, TestProcess) UNIT_TEST_CASE(PipelineUnittest, TestSend) UNIT_TEST_CASE(PipelineUnittest, TestFlushBatch) UNIT_TEST_CASE(PipelineUnittest, TestInProcessingCount) UNIT_TEST_CASE(PipelineUnittest, TestWaitAllItemsInProcessFinished) UNIT_TEST_CASE(PipelineUnittest, TestMultiFlusherAndRouter) } // namespace logtail UNIT_TEST_MAIN