void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher()

in core/unittest/config/CommonConfigProviderUnittest.cpp [275:719]


void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {
    // add config
    {
        string config = R"(
        {
            "config_server_list": [
                {
                    "cluster": "community",
                    "endpoint_list": [
                        "test.config.com:80"
                    ]
                }
            ],
            "ilogtail_tags": {
                "key1": "value1",
                "key2": "value2"
            }
        }
        )";
        APSARA_TEST_TRUE_FATAL(writeJsonToFile(config, ilogtailConfigPath));

        MockCommonConfigProvider provider;
        auto setResponse
            = [&provider](const string& operation, const string& reqBody, const string& configType, std::string& resp) {
                  static int sequence_num = 0;
                  configserver::proto::v2::HeartbeatRequest heartbeatReq;
                  heartbeatReq.ParseFromString(reqBody);
                  APSARA_TEST_EQUAL(heartbeatReq.sequence_num(), sequence_num);
                  sequence_num++;
                  APSARA_TEST_TRUE(heartbeatReq.capabilities() & configserver::proto::v2::AcceptsInstanceConfig);
                  APSARA_TEST_TRUE(heartbeatReq.capabilities()
                                   & configserver::proto::v2::AcceptsContinuousPipelineConfig);
                  APSARA_TEST_EQUAL(heartbeatReq.instance_id(), provider.GetInstanceId());
                  APSARA_TEST_EQUAL(heartbeatReq.agent_type(), "LoongCollector");
                  APSARA_TEST_EQUAL(heartbeatReq.attributes().ip(), LoongCollectorMonitor::mIpAddr);
                  APSARA_TEST_EQUAL(heartbeatReq.attributes().hostname(), LoongCollectorMonitor::mHostname);
                  APSARA_TEST_EQUAL(heartbeatReq.attributes().version(), ILOGTAIL_VERSION);
                  auto it = heartbeatReq.tags().begin();
                  APSARA_TEST_EQUAL(it->name(), "key1");
                  APSARA_TEST_EQUAL(it->value(), "value1");
                  it++;
                  APSARA_TEST_EQUAL(it->name(), "key2");
                  APSARA_TEST_EQUAL(it->value(), "value2");

                  APSARA_TEST_EQUAL(heartbeatReq.running_status(), "running");
                  APSARA_TEST_EQUAL(heartbeatReq.startup_time(), provider.mStartTime);


                  configserver::proto::v2::HeartbeatResponse heartbeatRespPb;
                  heartbeatRespPb.set_capabilities(configserver::proto::v2::ResponseFlags::ReportFullState);
                  {
                      auto pipeline = heartbeatRespPb.mutable_continuous_pipeline_config_updates();
                      auto configDetail = pipeline->Add();
                      configDetail->set_name("config1");
                      configDetail->set_version(1);
                      configDetail->set_detail(R"(
                        {
                            "enable": true,
                            "flushers": [
                                {
                                    "OnlyStdout": true,
                                    "Type": "flusher_stdout"
                                }
                            ],
                            "inputs": [
                                {
                                    "FilePaths": [
                                        "\/workspaces\/test1.log"
                                    ],
                                    "Type": "input_file"
                                }
                            ]
                        }
                        )");
                      configDetail = pipeline->Add();
                      configDetail->set_name("config2");
                      configDetail->set_version(1);
                      configDetail->set_detail(R"(
                {
                    "enable": true,
                    "flushers": 
                        {
                            "OnlyStdout": true,
                            "Type": "flusher_stdout"
                        }
                    ],
                    "inputs": [
                        {
                            "FilePaths": [
                                "\/workspaces\/test1.log"
                            ],
                            "Type": "input_file"
                        }
                    ]
                }
                )");
                  }
                  {
                      auto instanceconfig = heartbeatRespPb.mutable_instance_config_updates();
                      auto configDetail = instanceconfig->Add();
                      configDetail->set_name("instanceconfig1");
                      configDetail->set_version(1);
                      configDetail->set_detail(R"(
                        {
                                "enable": true,
                                "max_bytes_per_sec": 100012031023
                            }
                        )");
                      configDetail = instanceconfig->Add();
                      configDetail->set_name("instanceconfig2");
                      configDetail->set_version(1);
                      configDetail->set_detail(R"(
                        {
                                "enable": true
                                "max_bytes_per_sec": 100012031023
                            }
                        )");
                  }
                  {
                      auto commandconfig = heartbeatRespPb.mutable_onetime_pipeline_config_updates();
                      auto configDetail = commandconfig->Add();
                      configDetail->set_name("commandconfig1");
                      configDetail->set_detail(R"(
                        {
                                "enable": true,
                                "max_bytes_per_sec": 100012031023
                            }
                        )");
                  }
                  heartbeatRespPb.SerializeToString(&resp);
              };

        EXPECT_CALL(provider, SendHttpRequest(::testing::_, ::testing::_, ::testing::_, ::testing::_, ::testing::_))
            .WillRepeatedly(::testing::DoAll(::testing::Invoke([setResponse](const string& operation,
                                                                             const string& reqBody,
                                                                             const string& configType,
                                                                             const std::string& requestId,
                                                                             std::string& resp) {
                                                 setResponse(operation, reqBody, configType, resp);
                                             }),
                                             ::testing::Return(true)));


        provider.Init("common_v2");
        APSARA_TEST_EQUAL(provider.sName, "common config provider");
        APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true);
        APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1);
        APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com");
        APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].port, 80);
        APSARA_TEST_EQUAL(provider.mConfigServerTags.size(), 2);
        APSARA_TEST_EQUAL(provider.mConfigServerTags["key1"], "value1");
        APSARA_TEST_EQUAL(provider.mConfigServerTags["key2"], "value2");
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(false).host, "test.config.com");
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(false).port, 80);
        APSARA_TEST_EQUAL(provider.mConfigServerAddressId, 0);
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(true).host, "test.config.com");
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(true).port, 80);
        APSARA_TEST_EQUAL(provider.mConfigServerAddressId, 0);

        auto heartbeatRequest = provider.PrepareHeartbeat();
        configserver::proto::v2::HeartbeatResponse heartbeatResponse;
        provider.GetConfigUpdate();

        APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap.size(), 2);
        APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap["config1"].status, ConfigFeedbackStatus::APPLYING);
        APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap["config2"].status, ConfigFeedbackStatus::FAILED);

        // 处理 pipelineconfig
        auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
        size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
        builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
        CollectionPipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
        APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
        APSARA_TEST_EQUAL(1U + builtinPipelineCnt, pipelineConfigDiff.first.mAdded.size());
        APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[builtinPipelineCnt].mName, "config1");
        APSARA_TEST_EQUAL(CollectionPipelineManager::GetInstance()->GetAllConfigNames().size(),
                          1U + builtinPipelineCnt);
        APSARA_TEST_TRUE(CollectionPipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);
        // 再次处理 pipelineconfig
        pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
        CollectionPipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
        APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
        APSARA_TEST_TRUE(pipelineConfigDiff.first.mAdded.empty());
        APSARA_TEST_EQUAL(CollectionPipelineManager::GetInstance()->GetAllConfigNames().size(),
                          1U + builtinPipelineCnt);
        APSARA_TEST_TRUE(CollectionPipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);


        APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap.size(), 2);
        APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap["instanceconfig1"].status, ConfigFeedbackStatus::APPLYING);
        APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap["instanceconfig2"].status, ConfigFeedbackStatus::FAILED);

        // 处理 instanceconfig
        InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff();
        InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff);
        APSARA_TEST_TRUE(!instanceConfigDiff.IsEmpty());
        APSARA_TEST_EQUAL(1U, instanceConfigDiff.mAdded.size());
        APSARA_TEST_EQUAL(instanceConfigDiff.mAdded[0].mConfigName, "instanceconfig1");
        if (BOOL_FLAG(logtail_mode)) {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 1);
        } else {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 2);
        }
        APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames()[0], "instanceconfig1");
        // 再次处理 instanceconfig
        instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff();
        InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff);
        APSARA_TEST_TRUE(instanceConfigDiff.IsEmpty());
        APSARA_TEST_TRUE(instanceConfigDiff.mAdded.empty());
        if (BOOL_FLAG(logtail_mode)) {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 1);
        } else {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 2);
        }
        APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames()[0], "instanceconfig1");

        provider.Stop();
    }
    // test LoadConfigFile
    {
        MockCommonConfigProvider provider;
        provider.Init("common_v2");
        APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap.size(), 1);
        APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap["config1"].status, ConfigFeedbackStatus::APPLYING);
        APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap.size(), 1);
        APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap["instanceconfig1"].status, ConfigFeedbackStatus::APPLYING);
        provider.Stop();
    }
    // delete config
    {
        string config = R"(
        {
            "config_server_list": [
                {
                    "cluster": "community",
                    "endpoint_list": [
                        "test.config.com:80"
                    ]
                }
            ],
            "ilogtail_tags": {
                "key1": "value1",
                "key2": "value2"
            }
        }
        )";
        APSARA_TEST_TRUE_FATAL(writeJsonToFile(config, ilogtailConfigPath));

        MockCommonConfigProvider provider;
        auto setResponse
            = [&provider](const string& operation, const string& reqBody, const string& configType, std::string& resp) {
                  static int sequence_num = 0;
                  configserver::proto::v2::HeartbeatRequest heartbeatReq;
                  heartbeatReq.ParseFromString(reqBody);
                  APSARA_TEST_EQUAL(heartbeatReq.sequence_num(), sequence_num);
                  sequence_num++;
                  APSARA_TEST_TRUE(heartbeatReq.capabilities() & configserver::proto::v2::AcceptsInstanceConfig);
                  APSARA_TEST_TRUE(heartbeatReq.capabilities()
                                   & configserver::proto::v2::AcceptsContinuousPipelineConfig);
                  APSARA_TEST_EQUAL(heartbeatReq.instance_id(), provider.GetInstanceId());
                  APSARA_TEST_EQUAL(heartbeatReq.agent_type(), "LoongCollector");
                  APSARA_TEST_EQUAL(heartbeatReq.attributes().ip(), LoongCollectorMonitor::mIpAddr);
                  APSARA_TEST_EQUAL(heartbeatReq.attributes().hostname(), LoongCollectorMonitor::mHostname);
                  APSARA_TEST_EQUAL(heartbeatReq.attributes().version(), ILOGTAIL_VERSION);
                  auto it = heartbeatReq.tags().begin();
                  APSARA_TEST_EQUAL(it->name(), "key1");
                  APSARA_TEST_EQUAL(it->value(), "value1");
                  it++;
                  APSARA_TEST_EQUAL(it->name(), "key2");
                  APSARA_TEST_EQUAL(it->value(), "value2");

                  APSARA_TEST_EQUAL(heartbeatReq.running_status(), "running");
                  APSARA_TEST_EQUAL(heartbeatReq.startup_time(), provider.mStartTime);


                  configserver::proto::v2::HeartbeatResponse heartbeatRespPb;
                  heartbeatRespPb.set_capabilities(configserver::proto::v2::ResponseFlags::ReportFullState);
                  // pipeline
                  {
                      auto pipeline = heartbeatRespPb.mutable_continuous_pipeline_config_updates();
                      auto configDetail = pipeline->Add();
                      configDetail->set_name("config1");
                      configDetail->set_version(-1);
                      configDetail->set_detail(R"(
                        {
                            "enable": true,
                            "flushers": [
                                {
                                    "OnlyStdout": true,
                                    "Type": "flusher_stdout"
                                }
                            ],
                            "inputs": [
                                {
                                    "FilePaths": [
                                        "\/workspaces\/test1.log"
                                    ],
                                    "Type": "input_file"
                                }
                            ]
                        }
                        )");
                      configDetail = pipeline->Add();
                      configDetail->set_name("config2");
                      configDetail->set_version(-1);
                      configDetail->set_detail(R"(
                {
                    "enable": true,
                    "flushers": 
                        {
                            "OnlyStdout": true,
                            "Type": "flusher_stdout"
                        }
                    ],
                    "inputs": [
                        {
                            "FilePaths": [
                                "\/workspaces\/test1.log"
                            ],
                            "Type": "input_file"
                        }
                    ]
                }
                )");
                  }
                  // instanceconfig
                  {
                      auto instanceconfig = heartbeatRespPb.mutable_instance_config_updates();
                      auto configDetail = instanceconfig->Add();
                      configDetail->set_name("instanceconfig1");
                      configDetail->set_version(-1);
                      configDetail->set_detail(R"(
                        {
                                "enable": true,
                                "max_bytes_per_sec": 100012031023
                            }
                        )");
                      configDetail = instanceconfig->Add();
                      configDetail->set_name("instanceconfig2");
                      configDetail->set_version(-1);
                      configDetail->set_detail(R"(
                        {
                                "enable": true
                                "max_bytes_per_sec": 100012031023
                            }
                        )");
                  }
                  // commandconfig
                  {
                      auto commandconfig = heartbeatRespPb.mutable_onetime_pipeline_config_updates();
                      auto configDetail = commandconfig->Add();
                      configDetail->set_name("commandconfig1");
                      configDetail->set_detail(R"(
                        {
                                "enable": true,
                                "max_bytes_per_sec": 100012031023
                            }
                        )");
                  }
                  heartbeatRespPb.SerializeToString(&resp);
              };

        EXPECT_CALL(provider, SendHttpRequest(::testing::_, ::testing::_, ::testing::_, ::testing::_, ::testing::_))
            .WillRepeatedly(::testing::DoAll(::testing::Invoke([setResponse](const string& operation,
                                                                             const string& reqBody,
                                                                             const string& configType,
                                                                             const std::string& requestId,
                                                                             std::string& resp) {
                                                 setResponse(operation, reqBody, configType, resp);
                                             }),
                                             ::testing::Return(true)));


        provider.Init("common_v2");
        APSARA_TEST_EQUAL(provider.sName, "common config provider");
        APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true);
        APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1);
        APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com");
        APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].port, 80);
        APSARA_TEST_EQUAL(provider.mConfigServerTags.size(), 2);
        APSARA_TEST_EQUAL(provider.mConfigServerTags["key1"], "value1");
        APSARA_TEST_EQUAL(provider.mConfigServerTags["key2"], "value2");
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(false).host, "test.config.com");
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(false).port, 80);
        APSARA_TEST_EQUAL(provider.mConfigServerAddressId, 0);
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(true).host, "test.config.com");
        APSARA_TEST_EQUAL(provider.GetOneConfigServerAddress(true).port, 80);
        APSARA_TEST_EQUAL(provider.mConfigServerAddressId, 0);

        auto heartbeatRequest = provider.PrepareHeartbeat();
        configserver::proto::v2::HeartbeatResponse heartbeatResponse;
        provider.GetConfigUpdate();

        APSARA_TEST_TRUE(provider.mContinuousPipelineConfigInfoMap.empty());

        // 处理pipelineConfigDiff
        auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
        size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
        builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
        CollectionPipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
        APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
        APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mRemoved.size());
        APSARA_TEST_EQUAL(pipelineConfigDiff.first.mRemoved[0], "config1");
        APSARA_TEST_EQUAL(0U + builtinPipelineCnt,
                          CollectionPipelineManager::GetInstance()->GetAllConfigNames().size());
        // 再次处理pipelineConfigDiff
        pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
        CollectionPipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
        APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
        APSARA_TEST_TRUE(pipelineConfigDiff.first.mRemoved.empty());
        APSARA_TEST_EQUAL(0U + builtinPipelineCnt,
                          CollectionPipelineManager::GetInstance()->GetAllConfigNames().size());

        APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty());
        // 处理instanceConfigDiff
        InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff();
        InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff);
        if (BOOL_FLAG(logtail_mode)) {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 0);
        } else {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 1);
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames()[0], "loongcollector_config");
        }
        APSARA_TEST_EQUAL(1U, instanceConfigDiff.mRemoved.size());
        APSARA_TEST_EQUAL(instanceConfigDiff.mRemoved[0], "instanceconfig1");

        // 再次处理instanceConfigDiff
        instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff();
        InstanceConfigManager::GetInstance()->UpdateInstanceConfigs(instanceConfigDiff);
        if (BOOL_FLAG(logtail_mode)) {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 0);
        } else {
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames().size(), 1);
            APSARA_TEST_EQUAL(InstanceConfigManager::GetInstance()->GetAllConfigNames()[0], "loongcollector_config");
        }
        APSARA_TEST_TRUE(instanceConfigDiff.IsEmpty());
        APSARA_TEST_TRUE(instanceConfigDiff.mRemoved.empty());

        provider.Stop();
    }
}