in core/unittest/config/PipelineConfigUnittest.cpp [49:1275]
void PipelineConfigUnittest::HandleValidConfig() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
configStr = R"(
{
"createTime": 123456789,
"global": {
"TopicType": "none"
},
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"aggregators":[
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
],
"extensions": [
{
"Type": "ext_basicauth"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(configName, config->mName);
APSARA_TEST_EQUAL(123456789U, config->mCreateTime);
APSARA_TEST_NOT_EQUAL(config->mGlobal, nullptr);
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mAggregators.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_EQUAL(1U, config->mExtensions.size());
// topology 1: native -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Match": {
"Type": "event_type",
"Value": "log"
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_EQUAL(1U, config->mRouter.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_FALSE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_FALSE(config->HasGoPlugin());
// topology 2: extended -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 3: (native, extended) -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 4: native -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 5: extended -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 6: (native, extended) -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 7: native -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 8: extended -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 9: (native, extended) -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 10: native -> none -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Match": {
"Type": "event_type",
"Value": "log"
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_EQUAL(1U, config->mRouter.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_FALSE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_FALSE(config->HasGoPlugin());
// topology 11: extended -> none -> native (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 12: (native, extended) -> none -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 13: native -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 14: extended -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 15: (native, extended) -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 16: native -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 17: extended -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 18: (native, extended) -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 19: native -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 20: extended -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 21: (native, extended) -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 22: native -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 23: extended -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 24: (native, extended) -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 25: native -> native -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 26: extended -> native -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 27: (native, extended) -> native -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 28: native -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 29: extended -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 30: (native, extended) -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 31: native -> (naive -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 32: extended -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 33: (native, extended) -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 34: native -> none -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 35: extended -> none -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 36: (native, extended) -> none -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}