core/unittest/config/PipelineConfigUnittest.cpp (2,101 lines of code) (raw):
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <string>
#include "json/json.h"
#include "collection_pipeline/plugin/PluginRegistry.h"
#include "common/JsonUtil.h"
#include "config/CollectionConfig.h"
#include "unittest/Unittest.h"
using namespace std;
namespace logtail {
class PipelineConfigUnittest : public testing::Test {
public:
void HandleValidConfig() const;
void HandleInvalidCreateTime() const;
void HandleInvalidGlobal() const;
void HandleInvalidInputs() const;
void HandleInvalidProcessors() const;
void HandleInvalidAggregators() const;
void HandleInvalidFlushers() const;
void HandleInvalidExtensions() const;
void TestReplaceEnvVarRef() const;
protected:
static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); }
static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); }
private:
const string configName = "test";
};
void PipelineConfigUnittest::HandleValidConfig() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
configStr = R"(
{
"createTime": 123456789,
"global": {
"TopicType": "none"
},
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"aggregators":[
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
],
"extensions": [
{
"Type": "ext_basicauth"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(configName, config->mName);
APSARA_TEST_EQUAL(123456789U, config->mCreateTime);
APSARA_TEST_NOT_EQUAL(config->mGlobal, nullptr);
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mAggregators.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_EQUAL(1U, config->mExtensions.size());
// topology 1: native -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Match": {
"Type": "event_type",
"Value": "log"
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_EQUAL(1U, config->mRouter.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_FALSE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_FALSE(config->HasGoPlugin());
// topology 2: extended -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 3: (native, extended) -> native -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 4: native -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 5: extended -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 6: (native, extended) -> extended -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 7: native -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 8: extended -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 9: (native, extended) -> (native -> extended) -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 10: native -> none -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls",
"Match": {
"Type": "event_type",
"Value": "log"
}
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_EQUAL(1U, config->mRouter.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_FALSE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_FALSE(config->HasGoPlugin());
// topology 11: extended -> none -> native (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 12: (native, extended) -> none -> native
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 13: native -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 14: extended -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 15: (native, extended) -> native -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 16: native -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 17: extended -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 18: (native, extended) -> extended -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 19: native -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 20: extended -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 21: (native, extended) -> (native -> extended) -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 22: native -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 23: extended -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(1U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 24: (native, extended) -> none -> extended
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 25: native -> native -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 26: extended -> native -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 27: (native, extended) -> native -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 28: native -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 29: extended -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(1U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 30: (native, extended) -> extended -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 31: native -> (naive -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_json"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(2U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 32: extended -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 33: (native, extended) -> (native -> extended) -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_regex"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// topology 34: native -> none -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 35: extended -> none -> (native, extended) (future changes maybe applied)
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(1U, config->mInputs.size());
APSARA_TEST_EQUAL(0U, config->mProcessors.size());
APSARA_TEST_EQUAL(2U, config->mFlushers.size());
APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
// APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
APSARA_TEST_TRUE(config->HasGoPlugin());
// topology 36: (native, extended) -> none -> (native, extended)
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
},
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}
void PipelineConfigUnittest::HandleInvalidCreateTime() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
configStr = R"(
{
"createTime": "123456789",
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
APSARA_TEST_EQUAL(0U, config->mCreateTime);
}
void PipelineConfigUnittest::HandleInvalidGlobal() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
// global is not of type object
configStr = R"(
{
"global": []
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}
void PipelineConfigUnittest::HandleInvalidInputs() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
// no inputs
configStr = R"(
{
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// inputs is not of type array
configStr = R"(
{
"inputs": {},
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// inputs is empty
configStr = R"(
{
"inputs": [],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// inputs element is not of type object
configStr = R"(
{
"inputs": [
[]
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// no Type
configStr = R"(
{
"inputs": [
{
"type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// Type is not of type string
configStr = R"(
{
"inputs": [
{
"Type": true
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// unsupported input
configStr = R"(
{
"inputs": [
{
"Type": "input_unknown"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "input_unknown"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
},
{
"Type": "input_unknown"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
// more than 1 native input
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// native and extended inputs coexist
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
},
{
"Type": "service_docker_stdout"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdoutinput_file"
},
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}
void PipelineConfigUnittest::HandleInvalidProcessors() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
// processors is not of type array
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": {}
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// processors element is not of type object
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
[]
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// no Type
configStr = R"(
{
"inputs": [
{
"type": "input_file"
}
],
"processors": [
{
"type": "processor_parse_regex_native"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// Type is not of type string
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": true
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// unsupported processors
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_unknown"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_unknown"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_unknown"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// native processor plugin comes after extended processor plugin
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_json"
},
{
"Type": "processor_parse_regex_native"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// native processor plugins coexist with extended input plugins
configStr = R"(
{
"inputs": [
{
"Type": "service_docker_stdout"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// native processor plugins coexist with processor_spl
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_parse_regex_native"
},
{
"Type": "processor_spl"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"processors": [
{
"Type": "processor_spl"
},
{
"Type": "processor_parse_regex_native"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}
void PipelineConfigUnittest::HandleInvalidAggregators() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
// aggregators is not of type array
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": {},
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// aggregators element is not of type object
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": [
[]
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// no Type
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": [
{
"type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// Type is not of type string
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": [
{
"Type": true
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// unsupported aggregator
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": [
{
"Type": "aggregator_unknown"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// more than 1 aggregator
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": [
{
"Type": "aggregator_context"
},
{
"Type": "aggregator_default"
}
],
"flushers": [
{
"Type": "flusher_http"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// aggregator plugins exist in native flushing mode
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"aggregators": [
{
"Type": "aggregator_context"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}
void PipelineConfigUnittest::HandleInvalidFlushers() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
// no flushers
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// flushers is not of type array
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": {}
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// flushers is empty
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": []
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// flushers element is not of type object
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
[]
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// no Type
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// Type is not of type string
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"type": "flusher_sls"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// unsupported flusher
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_unknown"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
}
void PipelineConfigUnittest::HandleInvalidExtensions() const {
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
// extensions is not of type array
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
],
"extensions": {}
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// extensions element is not of type object
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
],
"extensions": [
[]
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// no Type
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
],
"extensions": [
{
"type": "ext_basicauth"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// Type is not of type string
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
],
"extensions": [
{
"type": true
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// unsupported extension
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
],
"extensions": [
{
"Type": "ext_unknown"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
// extension plugins exist when no extended plugin is given
configStr = R"(
{
"inputs": [
{
"Type": "input_file"
}
],
"flushers": [
{
"Type": "flusher_sls"
}
],
"extensions": [
{
"Type": "ext_basicauth"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_FALSE(config->Parse());
}
void PipelineConfigUnittest::TestReplaceEnvVarRef() const {
#if defined(__linux__)
setenv("__path4ut", "_home/$work", 1);
setenv("__file4ut", "!transaction/~un-do.log", 1);
#else
_putenv_s("__path4ut", "_home/$work");
_putenv_s("__file4ut", "!transaction/~un-do.log");
#endif
unique_ptr<Json::Value> configJson;
Json::Value resJson;
string configStr, resStr, errorMsg;
unique_ptr<CollectionConfig> config;
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/${__path4ut}/${__file4ut}"
]
}
]
}
)";
resStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
"/_home/$work/!transaction/~un-do.log"
]
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(resStr, resJson, errorMsg));
config.reset(new CollectionConfig(configName, std::move(configJson)));
APSARA_TEST_TRUE(config->ReplaceEnvVar());
APSARA_TEST_TRUE(*config->mDetail == resJson);
}
UNIT_TEST_CASE(PipelineConfigUnittest, HandleValidConfig)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidCreateTime)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidGlobal)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidInputs)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidProcessors)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidAggregators)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidFlushers)
UNIT_TEST_CASE(PipelineConfigUnittest, HandleInvalidExtensions)
UNIT_TEST_CASE(PipelineConfigUnittest, TestReplaceEnvVarRef)
} // namespace logtail
UNIT_TEST_MAIN