void PipelineConfigUnittest::HandleValidConfig()

in core/unittest/config/PipelineConfigUnittest.cpp [49:1275]


void PipelineConfigUnittest::HandleValidConfig() const {
    unique_ptr<Json::Value> configJson;
    string configStr, errorMsg;
    unique_ptr<CollectionConfig> config;

    configStr = R"(
        {
            "createTime": 123456789,
            "global": {
                "TopicType": "none"
            },
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_json"
                }
            ],
            "aggregators":[
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ],
            "extensions": [
                {
                    "Type": "ext_basicauth"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(configName, config->mName);
    APSARA_TEST_EQUAL(123456789U, config->mCreateTime);
    APSARA_TEST_NOT_EQUAL(config->mGlobal, nullptr);
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(2U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mAggregators.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_EQUAL(1U, config->mExtensions.size());

    // topology 1: native -> native -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Match": {
                        "Type": "event_type",
                        "Value": "log"
                    }
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_EQUAL(1U, config->mRouter.size());
    APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_FALSE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_FALSE(config->HasGoPlugin());

    // topology 2: extended -> native -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 3: (native, extended) -> native -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 4: native -> extended -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 5: extended -> extended -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 6: (native, extended) -> extended -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 7: native -> (native -> extended) -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(2U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 8: extended -> (native -> extended) -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 9: (native, extended) -> (native -> extended) -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 10: native -> none -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls",
                    "Match": {
                        "Type": "event_type",
                        "Value": "log"
                    }
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(0U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_EQUAL(1U, config->mRouter.size());
    APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_FALSE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_FALSE(config->HasGoPlugin());

    // topology 11: extended -> none -> native (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(0U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 12: (native, extended) -> none -> native
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 13: native -> native -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 14: extended -> native -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 15: (native, extended) -> native -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 16: native -> extended -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 17: extended -> extended -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 18: (native, extended) -> extended -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 19: native -> (native -> extended) -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(2U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 20: extended -> (native -> extended) -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 21: (native, extended) -> (native -> extended) -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 22: native -> none -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(0U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_FALSE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 23: extended -> none -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(0U, config->mProcessors.size());
    APSARA_TEST_EQUAL(1U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 24: (native, extended) -> none -> extended
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 25: native -> native -> (native, extended) (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 26: extended -> native -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 27: (native, extended) -> native -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 28: native -> extended -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 29: extended -> extended -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(1U, config->mProcessors.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 30: (native, extended) -> extended -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 31: native -> (naive -> extended) -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_json"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(2U, config->mProcessors.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 32: extended -> (native -> extended) -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 33: (native, extended) -> (native -> extended) -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "processors": [
                {
                    "Type": "processor_parse_regex_native"
                },
                {
                    "Type": "processor_regex"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());

    // topology 34: native -> none -> (native, extended) (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(0U, config->mProcessors.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_TRUE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 35: extended -> none -> (native, extended) (future changes maybe applied)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_TRUE(config->Parse());
    APSARA_TEST_EQUAL(1U, config->mInputs.size());
    APSARA_TEST_EQUAL(0U, config->mProcessors.size());
    APSARA_TEST_EQUAL(2U, config->mFlushers.size());
    APSARA_TEST_TRUE(config->ShouldNativeFlusherConnectedByGoPipeline());
    APSARA_TEST_TRUE(config->IsFlushingThroughGoPipelineExisted());
    // APSARA_TEST_FALSE(config->IsProcessRunnerInvolved());
    APSARA_TEST_TRUE(config->HasGoPlugin());

    // topology 36: (native, extended) -> none -> (native, extended)
    configStr = R"(
        {
            "inputs": [
                {
                    "Type": "input_file"
                },
                {
                    "Type": "service_docker_stdout"
                }
            ],
            "aggregators": [
                {
                    "Type": "aggregator_context"
                }
            ],
            "flushers": [
                {
                    "Type": "flusher_sls"
                },
                {
                    "Type": "flusher_http"
                }
            ]
        }
    )";
    configJson.reset(new Json::Value());
    APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
    config.reset(new CollectionConfig(configName, std::move(configJson)));
    APSARA_TEST_FALSE(config->Parse());
}