void PipelineUnittest::OnInitVariousTopology()

in core/unittest/pipeline/PipelineUnittest.cpp [468:2337]


void PipelineUnittest::OnInitVariousTopology() const {
    unique_ptr<Json::Value> configJson;
    Json::Value goPipelineWithInput, goPipelineWithoutInput;
    string configStr, goPipelineWithInputStr, goPipelineWithoutInputStr, errorMsg;
    unique_ptr<CollectionConfig> config;
    unique_ptr<CollectionPipeline> pipeline;

    // topology 1: native -> native -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());

    // topology 2: extended -> native -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 3: (native, extended) -> native -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 4: native -> extended -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize": 10,
                "EnableProcessorTag": true
            },
            "processors": [
                {
                    "type": "processor_regex/3",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/4",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/5",
                    "detail": {
                        "EnableShardHash": false
                    }
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 5: extended -> extended -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    goPipelineWithInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "EnableProcessorTag": true
            },
            "inputs": [
                {
                    "type": "service_docker_stdout/1",
                    "detail": {}
                }
            ],
            "processors": [
                {
                    "type": "processor_regex/2",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/3",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/4",
                    "detail": {
                        "EnableShardHash": false
                    }
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithInput.clear();

    // topology 6: (native, extended) -> extended -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 7: native -> (native -> extended) -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "processors": [
                {
                    "type": "processor_regex/4",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/5",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/6",
                    "detail": {
                        "EnableShardHash": false
                    }
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 8: extended -> (native -> extended) -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 9: (native, extended) -> (native -> extended) -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 10: native -> none -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());

    // topology 11: extended -> none -> native (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    goPipelineWithInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "EnableProcessorTag": true
            },
            "inputs": [
                {
                    "type": "service_docker_stdout/1",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/2",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/3",
                    "detail": {
                        "EnableShardHash": false
                    }
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithInput.clear();

    // topology 12: (native, extended) -> none -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 13: native -> native -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "aggregators": [
                {
                    "type": "aggregator_default/4",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_http/5",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 14: extended -> native -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 15: (native, extended) -> native -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 16: native -> extended -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10,
                "EnableProcessorTag": true
            },
            "processors": [
                {
                    "type": "processor_regex/3",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/4",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_http/5",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 17: extended -> extended -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "EnableProcessorTag": true
            },
            "inputs": [
                {
                    "type": "service_docker_stdout/1",
                    "detail": {}
                }
            ],
            "processors": [
                {
                    "type": "processor_regex/2",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/3",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_http/4",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
    APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithInput.clear();

    // topology 18: (native, extended) -> extended -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 19: native -> (native -> extended) -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "processors": [
                {
                    "type": "processor_regex/4",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/5",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_http/6",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 20: extended -> (native -> extended) -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 21: (native, extended) -> (native -> extended) -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 22: native -> none -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "aggregators": [
                {
                    "type": "aggregator_default/3",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_http/4",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 23: extended -> none -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "EnableProcessorTag": true
            },
            "inputs": [
                {
                    "type": "service_docker_stdout/1",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/2",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_http/3",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->GetFlushers().size());
    APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithInput.clear();

    // topology 24: (native, extended) -> none -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 25: native -> native -> (native, extended) (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "aggregators": [
                {
                    "type": "aggregator_default/4",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/5",
                    "detail": {
                        "EnableShardHash": false
                    }
                },
                {
                    "type": "flusher_http/6",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 26: extended -> native -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 27: (native, extended) -> native -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 28: native -> extended -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10,
                "EnableProcessorTag": true
            },
            "processors": [
                {
                    "type": "processor_regex/3",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/4",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/5",
                    "detail": {
                        "EnableShardHash": false
                    }
                },
                {
                    "type": "flusher_http/6",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 29: extended -> extended -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "EnableProcessorTag": true
            },
            "inputs": [
                {
                    "type": "service_docker_stdout/1",
                    "detail": {}
                }
            ],
            "processors": [
                {
                    "type": "processor_regex/2",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/3",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/4",
                    "detail": {
                        "EnableShardHash": false
                    }
                },
                {
                    "type": "flusher_http/5",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithInput.clear();

    // topology 30: (native, extended) -> extended -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 31: native -> (native -> extended) -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "processors": [
                {
                    "type": "processor_regex/4",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/5",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/6",
                    "detail": {
                        "EnableShardHash": false
                    }
                },
                {
                    "type": "flusher_http/7",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 32: extended -> (native -> extended) -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 33: (native, extended) -> (native -> extended) -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native",
                    "SourceKey": "content",
                    "Regex": ".*",
                    "Keys": ["key"]
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 34: native -> none -> (native, extended) (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithoutInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "DefaultLogQueueSize" : 10
            },
            "aggregators": [
                {
                    "type": "aggregator_default/3",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/4",
                    "detail": {
                        "EnableShardHash": false
                    }
                },
                {
                    "type": "flusher_http/5",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithoutInputStr, goPipelineWithoutInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(1U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(1U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithInput.isNull());
    APSARA_TEST_EQUAL(goPipelineWithoutInput.toStyledString(), pipeline->mGoPipelineWithoutInput.toStyledString());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithoutInput.clear();

    // topology 35: extended -> none -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    goPipelineWithInputStr = R"(
        {
            "global" : {
                "EnableTimestampNanosecond": false,
                "UsingOldContentTag": false,
                "EnableProcessorTag": true
            },
            "inputs": [
                {
                    "type": "service_docker_stdout/1",
                    "detail": {}
                }
            ],
            "aggregators": [
                {
                    "type": "aggregator_default/2",
                    "detail": {}
                }
            ],
            "flushers": [
                {
                    "type": "flusher_sls/3",
                    "detail": {
                        "EnableShardHash": false
                    }
                },
                {
                    "type": "flusher_http/4",
                    "detail": {}
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(goPipelineWithInputStr, goPipelineWithInput, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    pipeline.reset(new CollectionPipeline());
    APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
    APSARA_TEST_EQUAL(0U, pipeline->mInputs.size());
    APSARA_TEST_EQUAL(0U, pipeline->mPipelineInnerProcessorLine.size());
    APSARA_TEST_EQUAL(0U, pipeline->mProcessorLine.size());
    APSARA_TEST_EQUAL(1U, pipeline->GetFlushers().size());
    APSARA_TEST_EQUAL(goPipelineWithInput.toStyledString(), pipeline->mGoPipelineWithInput.toStyledString());
    APSARA_TEST_TRUE(pipeline->mGoPipelineWithoutInput.isNull());
    APSARA_TEST_NOT_EQUAL(nullptr, pipeline->GetContext().GetSLSInfo());
    goPipelineWithInput.clear();

    // topology 36: (native, extended) -> none -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file",
                    "FilePaths": [
                        "/home/test.log"
                    ]
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Project": "test_project",
                    "Logstore": "test_logstore",
                    "Region": "test_region",
                    "Endpoint": "test_endpoint",
                    "EnableShardHash": false
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());
}