core/config/common_provider/CommonConfigProvider.cpp (501 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "CommonConfigProvider.h" #include <filesystem> #include <iostream> #include <random> #include "json/json.h" #include "app_config/AppConfig.h" #include "application/Application.h" #include "common/LogtailCommonFlags.h" #include "common/StringTools.h" #include "common/UUIDUtil.h" #include "common/YamlUtil.h" #include "common/http/Constant.h" #include "common/http/Curl.h" #include "common/version.h" #include "config/CollectionConfig.h" #include "config/ConfigUtil.h" #include "config/feedbacker/ConfigFeedbackReceiver.h" #include "constants/Constants.h" #include "logger/Logger.h" #include "monitor/Monitor.h" using namespace std; DEFINE_FLAG_INT32(heartbeat_interval, "second", 10); namespace logtail { const string AGENT = "/Agent"; string CommonConfigProvider::configVersion = "version"; void CommonConfigProvider::Init(const string& dir) { sName = "common config provider"; ConfigProvider::Init(dir); LoadConfigFile(); mStartTime = Application::GetInstance()->GetStartTime(); mSequenceNum = 0; const Json::Value& confJson = AppConfig::GetInstance()->GetConfig(); // configserver path /*** demo * { * "config_server_list" : [ * { * "cluster" : "community", * "endpoint_list" : ["test.config.com:80"] * } * } */ if (confJson.isObject() && confJson.isMember("config_server_list") && confJson["config_server_list"].isArray() && confJson["config_server_list"].size() > 0 && confJson["config_server_list"][0].isObject() && confJson["config_server_list"][0].isMember("endpoint_list") && confJson["config_server_list"][0]["endpoint_list"].isArray()) { for (Json::Value::ArrayIndex i = 0; i < confJson["config_server_list"][0]["endpoint_list"].size(); ++i) { if (!confJson["config_server_list"][0]["endpoint_list"][i].isString()) { continue; } vector<string> configServerAddress = SplitString(TrimString(confJson["config_server_list"][0]["endpoint_list"][i].asString()), ":"); if (configServerAddress.size() != 2) { LOG_WARNING( sLogger, ("configserver_address", "format error")( "wrong address", TrimString(confJson["config_server_list"][0]["endpoint_list"][i].asString()))); continue; } string host = configServerAddress[0]; int32_t port = atoi(configServerAddress[1].c_str()); if (port < 1 || port > 65535) { LOG_WARNING(sLogger, ("configserver_address", "illegal port")("port", port)); continue; } mConfigServerAddresses.push_back(ConfigServerAddress(host, port)); } mConfigServerAvailable = true; LOG_INFO(sLogger, ("configserver_address", confJson["config_server_list"][0]["endpoint_list"].toStyledString())); } // tags for configserver if (confJson.isMember("ilogtail_tags") && confJson["ilogtail_tags"].isObject()) { Json::Value::Members members = confJson["ilogtail_tags"].getMemberNames(); for (Json::Value::Members::iterator it = members.begin(); it != members.end(); it++) { mConfigServerTags[*it] = confJson["ilogtail_tags"][*it].asString(); } LOG_INFO(sLogger, ("ilogtail_configserver_tags", confJson["ilogtail_tags"].toStyledString())); } GetConfigUpdate(); mThreadRes = async(launch::async, &CommonConfigProvider::CheckUpdateThread, this); } void CommonConfigProvider::Stop() { { lock_guard<mutex> lock(mThreadRunningMux); mIsThreadRunning = false; } mStopCV.notify_one(); if (!mThreadRes.valid()) { return; } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, (sName, "stopped successfully")); } else { LOG_WARNING(sLogger, (sName, "forced to stopped")); } } void CommonConfigProvider::LoadConfigFile() { error_code ec; for (auto const& entry : filesystem::directory_iterator(mContinuousPipelineConfigDir, ec)) { Json::Value detail; if (LoadConfigDetailFromFile(entry, detail)) { ConfigInfo info; info.name = entry.path().stem(); if (detail.isMember(CommonConfigProvider::configVersion) && detail[CommonConfigProvider::configVersion].isInt64()) { info.version = detail[CommonConfigProvider::configVersion].asInt64(); } info.status = ConfigFeedbackStatus::APPLYING; info.detail = detail.toStyledString(); { lock_guard<mutex> lockInfoMap(mInfoMapMux); mContinuousPipelineConfigInfoMap[info.name] = info; } ConfigFeedbackReceiver::GetInstance().RegisterContinuousPipelineConfig(info.name, this); } } for (auto const& entry : filesystem::directory_iterator(mInstanceSourceDir, ec)) { Json::Value detail; if (LoadConfigDetailFromFile(entry, detail)) { ConfigInfo info; info.name = entry.path().stem(); if (detail.isMember(CommonConfigProvider::configVersion) && detail[CommonConfigProvider::configVersion].isInt64()) { info.version = detail[CommonConfigProvider::configVersion].asInt64(); } info.status = ConfigFeedbackStatus::APPLYING; info.detail = detail.toStyledString(); { lock_guard<mutex> lockInfoMap(mInfoMapMux); mInstanceConfigInfoMap[info.name] = info; } ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(info.name, this); } } } void CommonConfigProvider::CheckUpdateThread() { LOG_INFO(sLogger, (sName, "started")); usleep((rand() % 10) * 100 * 1000); int32_t lastCheckTime = time(NULL); unique_lock<mutex> lock(mThreadRunningMux); while (mIsThreadRunning) { int32_t curTime = time(NULL); if (curTime - lastCheckTime >= INT32_FLAG(heartbeat_interval)) { GetConfigUpdate(); lastCheckTime = curTime; } if (mStopCV.wait_for(lock, chrono::seconds(3), [this]() { return !mIsThreadRunning; })) { break; } } } CommonConfigProvider::ConfigServerAddress CommonConfigProvider::GetOneConfigServerAddress(bool changeConfigServer) { if (0 == mConfigServerAddresses.size()) { return ConfigServerAddress("", -1); // No address available } // Return a random address if (changeConfigServer) { random_device rd; int tmpId = rd() % mConfigServerAddresses.size(); while (mConfigServerAddresses.size() > 1 && tmpId == mConfigServerAddressId) { tmpId = rd() % mConfigServerAddresses.size(); } mConfigServerAddressId = tmpId; } return ConfigServerAddress(mConfigServerAddresses[mConfigServerAddressId].host, mConfigServerAddresses[mConfigServerAddressId].port); } string CommonConfigProvider::GetInstanceId() { return Application::GetInstance()->GetInstanceId(); } void CommonConfigProvider::FillAttributes(configserver::proto::v2::AgentAttributes& attributes) { attributes.set_hostname(LoongCollectorMonitor::mHostname); attributes.set_ip(LoongCollectorMonitor::mIpAddr); attributes.set_version(ILOGTAIL_VERSION); google::protobuf::Map<string, string>* extras = attributes.mutable_extras(); extras->insert({"osDetail", LoongCollectorMonitor::mOsDetail}); } void addConfigInfoToRequest(const std::pair<const string, logtail::ConfigInfo>& configInfo, configserver::proto::v2::ConfigInfo* reqConfig) { reqConfig->set_name(configInfo.second.name); reqConfig->set_message(configInfo.second.message); reqConfig->set_version(configInfo.second.version); switch (configInfo.second.status) { case ConfigFeedbackStatus::UNSET: reqConfig->set_status(configserver::proto::v2::ConfigStatus::UNSET); break; case ConfigFeedbackStatus::APPLYING: reqConfig->set_status(configserver::proto::v2::ConfigStatus::APPLYING); break; case ConfigFeedbackStatus::APPLIED: reqConfig->set_status(configserver::proto::v2::ConfigStatus::APPLIED); break; case ConfigFeedbackStatus::FAILED: reqConfig->set_status(configserver::proto::v2::ConfigStatus::FAILED); break; case ConfigFeedbackStatus::DELETED: reqConfig->set_version(-1); break; } } void CommonConfigProvider::GetConfigUpdate() { if (!mConfigServerAvailable) { return; } auto heartbeatRequest = PrepareHeartbeat(); configserver::proto::v2::HeartbeatResponse heartbeatResponse; if (!SendHeartbeat(heartbeatRequest, heartbeatResponse)) { return; } ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail> pipelineConfig; if (FetchPipelineConfig(heartbeatResponse, pipelineConfig) && !pipelineConfig.empty()) { LOG_DEBUG(sLogger, ("fetch pipelineConfig, config file number", pipelineConfig.size())); UpdateRemotePipelineConfig(pipelineConfig); } ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail> instanceConfig; if (FetchInstanceConfig(heartbeatResponse, instanceConfig) && !instanceConfig.empty()) { LOG_DEBUG(sLogger, ("fetch instanceConfig config, config file number", instanceConfig.size())); UpdateRemoteInstanceConfig(instanceConfig); } ++mSequenceNum; } configserver::proto::v2::HeartbeatRequest CommonConfigProvider::PrepareHeartbeat() { configserver::proto::v2::HeartbeatRequest heartbeatReq; string requestID = CalculateRandomUUID(); heartbeatReq.set_request_id(requestID); heartbeatReq.set_sequence_num(mSequenceNum); heartbeatReq.set_capabilities(configserver::proto::v2::AcceptsInstanceConfig | configserver::proto::v2::AcceptsContinuousPipelineConfig); heartbeatReq.set_instance_id(GetInstanceId()); heartbeatReq.set_agent_type("LoongCollector"); FillAttributes(*heartbeatReq.mutable_attributes()); for (auto tag : mConfigServerTags) { configserver::proto::v2::AgentGroupTag* agentGroupTag = heartbeatReq.add_tags(); agentGroupTag->set_name(tag.first); agentGroupTag->set_value(tag.second); } heartbeatReq.set_running_status("running"); heartbeatReq.set_startup_time(mStartTime); { lock_guard<mutex> lockInfoMap(mInfoMapMux); for (const auto& configInfo : mContinuousPipelineConfigInfoMap) { addConfigInfoToRequest(configInfo, heartbeatReq.add_continuous_pipeline_configs()); } for (const auto& configInfo : mInstanceConfigInfoMap) { addConfigInfoToRequest(configInfo, heartbeatReq.add_instance_configs()); } for (const auto& configInfo : mOnetimePipelineConfigInfoMap) { addConfigInfoToRequest(configInfo, heartbeatReq.add_onetime_pipeline_configs()); } } return heartbeatReq; } bool CommonConfigProvider::SendHeartbeat(const configserver::proto::v2::HeartbeatRequest& heartbeatReq, configserver::proto::v2::HeartbeatResponse& heartbeatResponse) { string operation = AGENT; operation.append("/").append("Heartbeat"); string reqBody; heartbeatReq.SerializeToString(&reqBody); std::string heartbeatResp; if (SendHttpRequest(operation, reqBody, "SendHeartbeat", heartbeatReq.request_id(), heartbeatResp)) { configserver::proto::v2::HeartbeatResponse heartbeatRespPb; heartbeatRespPb.ParseFromString(heartbeatResp); heartbeatResponse.Swap(&heartbeatRespPb); return true; } else { return false; } } bool CommonConfigProvider::SendHttpRequest(const string& operation, const string& reqBody, const string& configType, const std::string& requestId, std::string& resp) { // LCOV_EXCL_START ConfigServerAddress configServerAddress = GetOneConfigServerAddress(false); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; HttpResponse httpResponse; if (!logtail::SendHttpRequest(make_unique<HttpRequest>(HTTP_POST, false, configServerAddress.host, configServerAddress.port, operation, "", httpHeader, reqBody), httpResponse)) { LOG_WARNING(sLogger, (configType, "fail")("reqBody", reqBody)("host", configServerAddress.host)("port", configServerAddress.port)); return false; } resp = *httpResponse.GetBody<string>(); return true; // LCOV_EXCL_STOP } bool CommonConfigProvider::FetchPipelineConfig( configserver::proto::v2::HeartbeatResponse& heartbeatResponse, ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>& result) { if (heartbeatResponse.flags() & ::configserver::proto::v2::FetchContinuousPipelineConfigDetail) { return FetchPipelineConfigFromServer(heartbeatResponse, result); } else { result.Swap(heartbeatResponse.mutable_continuous_pipeline_config_updates()); return true; } } bool CommonConfigProvider::FetchInstanceConfig( configserver::proto::v2::HeartbeatResponse& heartbeatResponse, ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>& result) { if (heartbeatResponse.flags() & ::configserver::proto::v2::FetchContinuousPipelineConfigDetail) { return FetchInstanceConfigFromServer(heartbeatResponse, result); } else { result.Swap(heartbeatResponse.mutable_instance_config_updates()); return true; } } bool CommonConfigProvider::DumpConfigFile(const configserver::proto::v2::ConfigDetail& config, const filesystem::path& sourceDir) { filesystem::path filePath = sourceDir / (config.name() + ".json"); filesystem::path tmpFilePath = sourceDir / (config.name() + ".json.new"); Json::Value detail; std::string errorMsg; if (!ParseConfigDetail(config.detail(), ".json", detail, errorMsg)) { LOG_WARNING(sLogger, ("failed to parse config detail", config.detail())); return false; } detail[CommonConfigProvider::configVersion] = config.version(); string configDetail = detail.toStyledString(); ofstream fout(tmpFilePath); if (!fout) { LOG_WARNING(sLogger, ("failed to open config file", filePath.string())); return false; } fout << configDetail; error_code ec; filesystem::rename(tmpFilePath, filePath, ec); if (ec) { LOG_WARNING( sLogger, ("failed to dump config file", filePath.string())("error code", ec.value())("error msg", ec.message())); filesystem::remove(tmpFilePath, ec); } return true; } void CommonConfigProvider::UpdateRemotePipelineConfig( const google::protobuf::RepeatedPtrField<configserver::proto::v2::ConfigDetail>& configs) { error_code ec; const std::filesystem::path& sourceDir = mContinuousPipelineConfigDir; filesystem::create_directories(sourceDir, ec); if (ec) { StopUsingConfigServer(); LOG_ERROR(sLogger, ("failed to create dir for common configs", "stop receiving config from common config server")( "dir", sourceDir.string())("error code", ec.value())("error msg", ec.message())); return; } // 保证每次往磁盘上dump文件的时候,config watcher不会读到一半的内容,相当于是个目录锁 lock_guard<mutex> lock(mContinuousPipelineMux); for (const auto& config : configs) { filesystem::path filePath = sourceDir / (config.name() + ".json"); if (config.version() == -1) { { lock_guard<mutex> lockInfoMap(mInfoMapMux); mContinuousPipelineConfigInfoMap.erase(config.name()); } filesystem::remove(filePath, ec); ConfigFeedbackReceiver::GetInstance().UnregisterContinuousPipelineConfig(config.name()); } else { if (!DumpConfigFile(config, sourceDir)) { lock_guard<mutex> lockInfoMap(mInfoMapMux); mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), .version = config.version(), .status = ConfigFeedbackStatus::FAILED, .detail = config.detail()}; continue; } { lock_guard<mutex> lockInfoMap(mInfoMapMux); mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), .version = config.version(), .status = ConfigFeedbackStatus::APPLYING, .detail = config.detail()}; } ConfigFeedbackReceiver::GetInstance().RegisterContinuousPipelineConfig(config.name(), this); } } } void CommonConfigProvider::UpdateRemoteInstanceConfig( const google::protobuf::RepeatedPtrField<configserver::proto::v2::ConfigDetail>& configs) { error_code ec; const std::filesystem::path& sourceDir = mInstanceSourceDir; filesystem::create_directories(sourceDir, ec); if (ec) { StopUsingConfigServer(); LOG_ERROR(sLogger, ("failed to create dir for common configs", "stop receiving config from common config server")( "dir", sourceDir.string())("error code", ec.value())("error msg", ec.message())); return; } // 保证每次往磁盘上dump文件的时候,config watcher不会读到一半的内容,相当于是个目录锁 lock_guard<mutex> lock(mInstanceMux); for (const auto& config : configs) { filesystem::path filePath = sourceDir / (config.name() + ".json"); if (config.version() == -1) { { lock_guard<mutex> lockInfoMap(mInfoMapMux); mInstanceConfigInfoMap.erase(config.name()); } filesystem::remove(filePath, ec); ConfigFeedbackReceiver::GetInstance().UnregisterInstanceConfig(config.name()); } else { if (!DumpConfigFile(config, sourceDir)) { lock_guard<mutex> lockInfoMap(mInfoMapMux); mInstanceConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), .version = config.version(), .status = ConfigFeedbackStatus::FAILED, .detail = config.detail()}; continue; } { lock_guard<mutex> lockInfoMap(mInfoMapMux); mInstanceConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), .version = config.version(), .status = ConfigFeedbackStatus::APPLYING, .detail = config.detail()}; } ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(config.name(), this); } } } bool CommonConfigProvider::FetchInstanceConfigFromServer( ::configserver::proto::v2::HeartbeatResponse& heartbeatResponse, ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>& res) { configserver::proto::v2::FetchConfigRequest fetchConfigRequest; string requestID = CalculateRandomUUID(); fetchConfigRequest.set_request_id(requestID); fetchConfigRequest.set_instance_id(GetInstanceId()); for (const auto& config : heartbeatResponse.instance_config_updates()) { auto reqConfig = fetchConfigRequest.add_instance_configs(); reqConfig->set_name(config.name()); reqConfig->set_version(config.version()); } string operation = AGENT; operation.append("/FetchInstanceConfig"); string reqBody; fetchConfigRequest.SerializeToString(&reqBody); string fetchConfigResponse; if (SendHttpRequest( operation, reqBody, "FetchInstanceConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { configserver::proto::v2::FetchConfigResponse fetchConfigResponsePb; fetchConfigResponsePb.ParseFromString(fetchConfigResponse); res.Swap(fetchConfigResponsePb.mutable_instance_config_updates()); return true; } return false; } bool CommonConfigProvider::FetchPipelineConfigFromServer( ::configserver::proto::v2::HeartbeatResponse& heartbeatResponse, ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>& res) { configserver::proto::v2::FetchConfigRequest fetchConfigRequest; string requestID = CalculateRandomUUID(); fetchConfigRequest.set_request_id(requestID); fetchConfigRequest.set_instance_id(GetInstanceId()); for (const auto& config : heartbeatResponse.continuous_pipeline_config_updates()) { auto reqConfig = fetchConfigRequest.add_continuous_pipeline_configs(); reqConfig->set_name(config.name()); reqConfig->set_version(config.version()); } string operation = AGENT; operation.append("/FetchPipelineConfig"); string reqBody; fetchConfigRequest.SerializeToString(&reqBody); string fetchConfigResponse; if (SendHttpRequest( operation, reqBody, "FetchPipelineConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { configserver::proto::v2::FetchConfigResponse fetchConfigResponsePb; fetchConfigResponsePb.ParseFromString(fetchConfigResponse); res.Swap(fetchConfigResponsePb.mutable_continuous_pipeline_config_updates()); return true; } return false; } void CommonConfigProvider::FeedbackContinuousPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) { lock_guard<mutex> lockInfoMap(mInfoMapMux); auto info = mContinuousPipelineConfigInfoMap.find(name); if (info != mContinuousPipelineConfigInfoMap.end()) { info->second.status = status; } LOG_DEBUG(sLogger, ("CommonConfigProvider", "FeedbackContinuousPipelineConfigStatus")("name", name)("status", ToStringView(status))); } void CommonConfigProvider::FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) { lock_guard<mutex> lockInfoMap(mInfoMapMux); auto info = mInstanceConfigInfoMap.find(name); if (info != mInstanceConfigInfoMap.end()) { info->second.status = status; } LOG_DEBUG(sLogger, ("CommonConfigProvider", "FeedbackInstanceConfigStatus")("name", name)("status", ToStringView(status))); } void CommonConfigProvider::FeedbackOnetimePipelineConfigStatus(const std::string& type, const std::string& name, ConfigFeedbackStatus status) { lock_guard<mutex> lockInfoMap(mInfoMapMux); auto info = mOnetimePipelineConfigInfoMap.find(GenerateOnetimePipelineConfigFeedBackKey(type, name)); if (info != mOnetimePipelineConfigInfoMap.end()) { info->second.status = status; } LOG_DEBUG(sLogger, ("CommonConfigProvider", "FeedbackOnetimePipelineConfigStatus")("type", type)("name", name)("status", ToStringView(status))); } } // namespace logtail