in core/unittest/pipeline/PipelineUnittest.cpp [468:2337]
void PipelineUnittest::OnInitVariousTopology() const {
unique_ptr<Json::Value> configJson;
Json::Value goPipelineWithInput, goPipelineWithoutInput;
string configStr, goPipelineWithInputStr, goPipelineWithoutInputStr, errorMsg;
unique_ptr<CollectionConfig> config;
unique_ptr<CollectionPipeline> pipeline;
// topology 1: native -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
// topology 2: extended -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 3: (native, extended) -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 4: native -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize": 10,
"EnableProcessorTag": true
},
"processors": [
{
"type": "processor_regex/3",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/4",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/5",
"detail": {
"EnableShardHash": false
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 5: extended -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
goPipelineWithInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"EnableProcessorTag": true
},
"inputs": [
{
"type": "service_docker_stdout/1",
"detail": {}
}
],
"processors": [
{
"type": "processor_regex/2",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/3",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/4",
"detail": {
"EnableShardHash": false
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithInput.clear();
// topology 6: (native, extended) -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 7: native -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"processors": [
{
"type": "processor_regex/4",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/5",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/6",
"detail": {
"EnableShardHash": false
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 8: extended -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 9: (native, extended) -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 10: native -> none -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
// topology 11: extended -> none -> native (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
goPipelineWithInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"EnableProcessorTag": true
},
"inputs": [
{
"type": "service_docker_stdout/1",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/2",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/3",
"detail": {
"EnableShardHash": false
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithInput.clear();
// topology 12: (native, extended) -> none -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 13: native -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"aggregators": [
{
"type": "aggregator_default/4",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_http/5",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 14: extended -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 15: (native, extended) -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 16: native -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10,
"EnableProcessorTag": true
},
"processors": [
{
"type": "processor_regex/3",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/4",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_http/5",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 17: extended -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"EnableProcessorTag": true
},
"inputs": [
{
"type": "service_docker_stdout/1",
"detail": {}
}
],
"processors": [
{
"type": "processor_regex/2",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/3",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_http/4",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithInput.clear();
// topology 18: (native, extended) -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 19: native -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"processors": [
{
"type": "processor_regex/4",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/5",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_http/6",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 20: extended -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 21: (native, extended) -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 22: native -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"aggregators": [
{
"type": "aggregator_default/3",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_http/4",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 23: extended -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"EnableProcessorTag": true
},
"inputs": [
{
"type": "service_docker_stdout/1",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/2",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_http/3",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithInput.clear();
// topology 24: (native, extended) -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 25: native -> native -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"aggregators": [
{
"type": "aggregator_default/4",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/5",
"detail": {
"EnableShardHash": false
}
},
{
"type": "flusher_http/6",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 26: extended -> native -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 27: (native, extended) -> native -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 28: native -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10,
"EnableProcessorTag": true
},
"processors": [
{
"type": "processor_regex/3",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/4",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/5",
"detail": {
"EnableShardHash": false
}
},
{
"type": "flusher_http/6",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 29: extended -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"EnableProcessorTag": true
},
"inputs": [
{
"type": "service_docker_stdout/1",
"detail": {}
}
],
"processors": [
{
"type": "processor_regex/2",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/3",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/4",
"detail": {
"EnableShardHash": false
}
},
{
"type": "flusher_http/5",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithInput.clear();
// topology 30: (native, extended) -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 31: native -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"processors": [
{
"type": "processor_regex/4",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/5",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/6",
"detail": {
"EnableShardHash": false
}
},
{
"type": "flusher_http/7",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 32: extended -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 33: (native, extended) -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native",
"SourceKey": "content",
"Regex": ".*",
"Keys": ["key"]
},
{
"Type": "processor_regex"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 34: native -> none -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithoutInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"DefaultLogQueueSize" : 10
},
"aggregators": [
{
"type": "aggregator_default/3",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/4",
"detail": {
"EnableShardHash": false
}
},
{
"type": "flusher_http/5",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithoutInput.clear();
// topology 35: extended -> none -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
goPipelineWithInputStr = R"(
{
"global" : {
"EnableTimestampNanosecond": false,
"UsingOldContentTag": false,
"EnableProcessorTag": true
},
"inputs": [
{
"type": "service_docker_stdout/1",
"detail": {}
}
],
"aggregators": [
{
"type": "aggregator_default/2",
"detail": {}
}
],
"flushers": [
{
"type": "flusher_sls/3",
"detail": {
"EnableShardHash": false
}
},
{
"type": "flusher_http/4",
"detail": {}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
goPipelineWithInput.clear();
// topology 36: (native, extended) -> none -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/home/test.log"
]
},
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint",
"EnableShardHash": false
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}