aios/apps/facility/build_service/build_service/worker/AgentServiceImpl.cpp (889 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "build_service/worker/AgentServiceImpl.h" #include <assert.h> #include <cstdint> #include <errno.h> #include <exception> #include <ext/alloc_traits.h> #include <functional> #include <iosfwd> #include <iterator> #include <memory> #include <signal.h> #include <stdio.h> #include <string.h> #include <sys/stat.h> #include <sys/wait.h> #include <unistd.h> #include "autil/CRC32C.h" #include "autil/EnvUtil.h" #include "autil/Log.h" #include "autil/StringUtil.h" #include "autil/TimeUtility.h" #include "autil/legacy/legacy_jsonizable.h" #include "build_service/common/ConfigDownloader.h" #include "build_service/common/CpuSpeedEstimater.h" #include "build_service/common/NetworkTrafficEstimater.h" #include "build_service/proto/ProtoComparator.h" #include "build_service/proto/ProtoUtil.h" #include "build_service/proto/RoleNameGenerator.h" #include "build_service/util/Monitor.h" #include "fslib/fs/FileSystem.h" #include "fslib/util/FileUtil.h" #include "indexlib/file_system/ErrorCode.h" #include "indexlib/file_system/FSResult.h" #include "indexlib/file_system/fslib/FslibWrapper.h" #include "indexlib/util/PathUtil.h" #include "kmonitor/client/MetricType.h" using namespace std; using namespace autil; using namespace build_service::proto; namespace build_service { namespace worker { BS_LOG_SETUP(worker, AgentServiceImpl); #define STDOUT_FILE_NAME "stdout.out" #define STDERR_FILE_NAME "stderr.out" namespace { const int64_t WORKER_THREAD_LOOP_INTERVAL_US = autil::EnvUtil::getEnv("bs_agent_thread_loop_interval", (int64_t)(5 * 1000 * 1000)); // 5s const int64_t AGENT_INNER_RESTART_COUNT_THRESHOLD = autil::EnvUtil::getEnv("bs_agent_restart_count_threshold", (int64_t)10); const int64_t AGENT_INNER_RESTART_COUNT_RESET_INTERVAL = autil::EnvUtil::getEnv("bs_agent_restart_count_reset_interval", (int64_t)600); // 10min const bool AGENT_FORK_WITH_NEW_SESSION = autil::EnvUtil::getEnv("bs_agent_fork_with_new_session", true); const int64_t AGENT_USELESS_DIR_EXPIRE_TIME_IN_SEC = autil::EnvUtil::getEnv("bs_agent_useless_dir_expire_time", (int64_t)(24 * 60 * 60)); // 24h const int64_t AGENT_CLEAN_USELESS_DIR_INTERVAL = autil::EnvUtil::getEnv("bs_agent_clean_useless_dir_interval", (int64_t)600); // 10min const size_t AGENT_MAX_KEEP_USELESS_DIR_COUNT = autil::EnvUtil::getEnv("bs_agent_max_keep_useless_dir_count", (size_t)256); // 256 } // namespace AgentServiceImpl::AgentServiceImpl(const std::map<std::string, std::string>& procArgMap, const PartitionId& pid, indexlib::util::MetricProviderPtr metricProvider, const LongAddress& address, const string& appZkRoot, const string& adminServiceName) : WorkerStateHandler(pid, metricProvider, appZkRoot, adminServiceName, "") , _procArgMap(procArgMap) , _loopThreadInterval(WORKER_THREAD_LOOP_INTERVAL_US) , _totalRestartCnt(0) , _latestRestartTimestamp(-1) , _latestCleanDirTimestamp(-1) { _agentRoleName = RoleNameGenerator::generateRoleName(pid); *_current.mutable_longaddress() = address; _targetRoleCntMetric = DECLARE_METRIC(_metricProvider, "targetRoleCount", kmonitor::STATUS, "count"); _subProcessCntMetric = DECLARE_METRIC(_metricProvider, "subProcessCount", kmonitor::STATUS, "count"); _totalRestartCntMetric = DECLARE_METRIC(_metricProvider, "totalRestartCount", kmonitor::STATUS, "count"); _startProcessQps = DECLARE_METRIC(_metricProvider, "startProcessQps", kmonitor::QPS, "count"); _startProcessFailQps = DECLARE_METRIC(_metricProvider, "startProcessFailQps", kmonitor::QPS, "count"); _stopProcessQps = DECLARE_METRIC(_metricProvider, "stopProcessQps", kmonitor::QPS, "count"); _deadProcessQps = DECLARE_METRIC(_metricProvider, "deadProcessQps", kmonitor::QPS, "count"); } AgentServiceImpl::~AgentServiceImpl() { _workerStatusUpdaterThread.reset(); _latestConfigUpdaterThread.reset(); _cleanUselessDirThread.reset(); autil::ScopedLock lock(_processStatusLock); for (auto iter = _processStatus.begin(); iter != _processStatus.end(); iter++) { if (!stopProcess(iter->second)) { AUTIL_LOG(ERROR, "stop process roleName [%s], procName [%s], pid [%d] failed", iter->first.first.c_str(), iter->first.second.c_str(), iter->second); } } } bool AgentServiceImpl::init() { _latestCleanDirTimestamp = autil::TimeUtility::currentTimeInSeconds(); prepareGlobalEnvironMap(); if (!fslib::util::FileUtil::getCurrentPath(_workDir)) { AUTIL_LOG(ERROR, "get cwd path fail."); return false; } _binaryPath = getBinaryPath(); _workerStatusUpdaterThread = autil::LoopThread::createLoopThread( std::bind(&AgentServiceImpl::updateWorkerStatus, this), _loopThreadInterval, "roleStatUpd"); if (!_workerStatusUpdaterThread) { AUTIL_LOG(ERROR, "create worker status updater thread fail"); return false; } _latestConfigUpdaterThread = autil::LoopThread::createLoopThread( std::bind(&AgentServiceImpl::updateLatestConfig, this), _loopThreadInterval, "configUpd"); if (!_latestConfigUpdaterThread) { AUTIL_LOG(ERROR, "create latest config updater thread fail"); return false; } _cleanUselessDirThread = autil::LoopThread::createLoopThread(std::bind(&AgentServiceImpl::cleanUselessDir, this), _loopThreadInterval, "cleanUseless"); if (!_cleanUselessDirThread) { AUTIL_LOG(ERROR, "create clean useless dir thread fail"); return false; } bool ret = _cpuEstimater.Start(/*sampleCountLimit=*/5, /*checkInterval=*/60); if (!ret) { return false; } ret = _networkEstimater.Start(); _current.set_status(proto::WS_STARTED); return ret; } void AgentServiceImpl::doHandleTargetState(const string& state, bool hasResourceUpdated) { proto::AgentTarget target; if (!target.ParseFromString(state)) { BS_LOG(ERROR, "invalid agent target string: %s", state.c_str()); return; } if (_current.targetstatus() == target) { return; } BS_LOG(INFO, "target status: %s", target.ShortDebugString().c_str()); if (updateAppPlan(target)) { ScopedLock lock(_lock); *_current.mutable_targetstatus() = target; } else { FILL_ERRORINFO(BUILDER_ERROR_UNKNOWN, "update plan for target [" + target.ShortDebugString() + "]failed", BS_RETRY); setFatalError(); } } bool AgentServiceImpl::updateAppPlan(const proto::AgentTarget& target) { std::vector<std::string> targetConfigs; auto addTargetConfig = [&](const std::string& configPath) { if (!configPath.empty()) { targetConfigs.push_back(configPath); } }; if (target.has_configpath()) { addTargetConfig(target.configpath()); } for (size_t i = 0; i < target.globalconfigs_size(); i++) { addTargetConfig(target.globalconfigs(i)); } if (!targetConfigs.empty()) { _latestConfigGuard.set(std::make_shared<ConfigDownloadInfo>(std::make_pair(targetConfigs, false))); } // 1. 生成所有的启动命令、参数 std::map<ProcessKey, ProcessInfo> processPlan; if (!createProcessPlan(target, processPlan)) { return false; } bool pidChange = false; autil::ScopedLock lock(_processStatusLock); _targetProcessPlan = processPlan; REPORT_METRIC(_targetRoleCntMetric, _targetProcessPlan.size()); reuseCurrentProc(pidChange); // 2. 停掉plan里没有的 for (auto iter = _processStatus.begin(); iter != _processStatus.end();) { if (processPlan.end() == processPlan.find(iter->first)) { pid_t pid = iter->second; if (stopProcess(pid) || !existProcess(pid)) { pidChange = true; AUTIL_LOG(INFO, "stop worker[%s] procName[%s], pid[%d] success", iter->first.first.c_str(), iter->first.second.c_str(), pid); iter = _processStatus.erase(iter); } else { AUTIL_LOG(ERROR, "stop worker[%s] procName[%s], pid[%d] failed", iter->first.first.c_str(), iter->first.second.c_str(), pid); ++iter; } } else { ++iter; } } // 3. 去掉已经起来的 for (auto iter = processPlan.begin(); iter != processPlan.end();) { if (_processStatus.end() != _processStatus.find(iter->first)) { iter = processPlan.erase(iter); } else { ++iter; } } // 4. 启动剩下的 for (auto iter = processPlan.begin(); iter != processPlan.end(); ++iter) { pid_t pid = startProcess(iter->first.first, iter->first.second, iter->second.cmd, iter->second.configPath, iter->second.envs); if (pid < 0) { AUTIL_LOG(ERROR, "start worker[%s], procName[%s] failed", iter->first.first.c_str(), iter->first.second.c_str()); INCREASE_QPS(_startProcessFailQps); continue; } _processStatus[iter->first] = pid; pidChange = true; } if (pidChange) { updatePidFile(); } return true; } pid_t AgentServiceImpl::startProcess(const std::string& roleName, const std::string& procName, const std::string& cmd, const std::string& configPath, const std::vector<std::pair<std::string, std::string>>& envs) { auto workDir = getRoleWorkDir(roleName, procName); if (!createWorkDir(workDir)) { AUTIL_LOG(ERROR, "create workdir failed for process:[%s], " "workdir:[%s]", cmd.c_str(), workDir.c_str()); return -1; } if (procName == "build_service_worker") { prepareBinarySymLink(workDir); prepareConfigSymLink(workDir, configPath); } pid_t pid; if ((pid = fork()) == -1) { return -1; } if (pid == 0) { // child process if (AGENT_FORK_WITH_NEW_SESSION) { setsid(); } else { setpgid(0, 0); } (void)chdir(workDir.c_str()); auto envParams = setEnvForSubProcess(workDir, envs); auto cmdParams = autil::StringUtil::split(cmd, " "); for (int i = 0; i < cmdParams.size(); ++i) { cmdParams[i] = autil::EnvUtil::envReplace(cmdParams[i]); } const char* pathname = cmdParams[0].c_str(); char** argsList = makeArgsList(cmdParams); char** envList = makeArgsList(envParams); fclose(stdout); fclose(stderr); umask(0); string pidSuffixStr = "." + StringUtil::toString(getpid()); string stdoutFile = string(STDOUT_FILE_NAME) + pidSuffixStr; string stderrFile = string(STDERR_FILE_NAME) + pidSuffixStr; (void)freopen(stdoutFile.c_str(), "w+", stdout); (void)freopen(stderrFile.c_str(), "w+", stderr); execve(pathname, argsList, envList); fprintf(stderr, "command(%s) is not execve successful, error code: %d(%s)\n", cmd.c_str(), errno, strerror(errno)); fflush(stdout); fflush(stderr); _exit(-1); // error occurred } AUTIL_LOG(INFO, "start process. cmd:[%s], pid:[%d], workdir:[%s]", cmd.c_str(), pid, workDir.c_str()); INCREASE_QPS(_startProcessQps); return pid; } bool AgentServiceImpl::stopProcess(pid_t pid) { return killProcess(pid, SIGTERM); } bool AgentServiceImpl::killProcess(pid_t pid, int32_t signal) { assert(pid > 1); if (kill(-pid, signal) != 0) { int ec = errno; AUTIL_LOG(ERROR, "kill process failed. pid:[%d], sig:[%d], " "errno:[%d], errmsg:[%s]", pid, signal, ec, strerror(ec)); return false; } AUTIL_LOG(INFO, "stop process [%d] by sig [%d]", pid, signal); INCREASE_QPS(_stopProcessQps); return true; } bool AgentServiceImpl::existProcess(pid_t pid) { return (0 == kill(pid, 0)); // process exists } void AgentServiceImpl::updateLatestConfig() { std::shared_ptr<ConfigDownloadInfo> latestConfigPtr; _latestConfigGuard.get(latestConfigPtr); if (latestConfigPtr == nullptr || latestConfigPtr->first.empty() || latestConfigPtr->second) { return; } bool success = true; for (auto remoteConfigPath : latestConfigPtr->first) { auto localConfigPath = fslib::util::FileUtil::joinFilePath( fslib::util::FileUtil::joinFilePath(_workDir, "config"), getLocalConfigDirName(remoteConfigPath)); common::ConfigDownloader::DownloadErrorCode errorCode = common::ConfigDownloader::downloadConfig(remoteConfigPath, localConfigPath); if (errorCode == common::ConfigDownloader::DEC_NORMAL_ERROR || errorCode == common::ConfigDownloader::DEC_DEST_ERROR) { success = false; BS_LOG(WARN, "downloadConfig from %s failed", remoteConfigPath.c_str()); } } latestConfigPtr->second = success; if (success) { ScopedLock lock(_lock); _current.set_serviceready(true); } } void AgentServiceImpl::cleanUselessDir() { auto currentTimestamp = autil::TimeUtility::currentTimeInSeconds(); if (currentTimestamp - _latestCleanDirTimestamp < AGENT_CLEAN_USELESS_DIR_INTERVAL) { return; } _latestCleanDirTimestamp = currentTimestamp; AUTIL_LOG(INFO, "begin clean useless config and role dir"); std::shared_ptr<ConfigDownloadInfo> latestConfigPtr; _latestConfigGuard.get(latestConfigPtr); std::set<std::string> inUseConfigNames; if (latestConfigPtr) { for (auto remoteConfigPath : latestConfigPtr->first) { inUseConfigNames.insert(getLocalConfigDirName(remoteConfigPath)); } } std::set<std::string> inUseRoleNames; { autil::ScopedLock lock(_processStatusLock); for (auto iter = _processStatus.begin(); iter != _processStatus.end(); iter++) { inUseRoleNames.insert(iter->first.first); } for (auto iter = _targetProcessPlan.begin(); iter != _targetProcessPlan.end(); iter++) { inUseRoleNames.insert(iter->first.first); } } fillInUseLinkConfigDirNames(inUseRoleNames, inUseConfigNames); removeUselessDir(fslib::util::FileUtil::joinFilePath(_workDir, "target_roles"), inUseRoleNames); removeUselessDir(fslib::util::FileUtil::joinFilePath(_workDir, "config"), inUseConfigNames); } void AgentServiceImpl::fillInUseLinkConfigDirNames(const std::set<std::string>& inUseRoleNames, std::set<std::string>& inUseConfigNames) { string srcConfigRoot = fslib::util::FileUtil::joinFilePath(_workDir, "config"); for (auto& roleName : inUseRoleNames) { string configDir = fslib::util::FileUtil::joinFilePath(getRoleWorkDir(roleName, "build_service_worker"), "config"); fslib::FileList fileList; fslib::fs::FileSystem::listDir(configDir, fileList); for (auto item : fileList) { string configPath = fslib::util::FileUtil::joinFilePath(configDir, item); if (!fslib::util::FileUtil::isLink(configPath)) { continue; } string linkPath = fslib::util::FileUtil::readLink(configPath); if (linkPath.empty()) { continue; } string reletivePath; if (!indexlib::util::PathUtil::GetRelativePath(srcConfigRoot, linkPath, reletivePath)) { AUTIL_LOG(ERROR, "getReletivePath failed, parent path [%s], config path [%s]", srcConfigRoot.c_str(), linkPath.c_str()); } vector<string> dirNames = autil::StringUtil::split(reletivePath, "/"); if (!dirNames.empty()) { AUTIL_LOG(INFO, "find in-use dir [%s] in config root [%s].", dirNames[0].c_str(), srcConfigRoot.c_str()); inUseConfigNames.insert(dirNames[0]); } } } } void AgentServiceImpl::removeUselessDir(const std::string& rootDir, const std::set<std::string>& inUseSubDirNames) { fslib::RichFileList fileList; fslib::fs::FileSystem::listDir(rootDir, fileList); auto currentTimestamp = autil::TimeUtility::currentTimeInSeconds(); vector<string> toRemoveExpireSubDirs; vector<string> toRemoveUnExpireSubDirs; extractToRemoveSubDirs(fileList, inUseSubDirNames, currentTimestamp, AGENT_USELESS_DIR_EXPIRE_TIME_IN_SEC, AGENT_MAX_KEEP_USELESS_DIR_COUNT, toRemoveExpireSubDirs, toRemoveUnExpireSubDirs); removeTargetSubDirs(rootDir, toRemoveExpireSubDirs, toRemoveUnExpireSubDirs); } void AgentServiceImpl::extractToRemoveSubDirs(const fslib::RichFileList& fileList, const std::set<std::string>& inUseSubDirNames, int64_t currentTimestamp, int64_t expireTimeInSec, size_t maxUselessDirCount, std::vector<std::string>& toRemoveExpiredSubDirs, std::vector<std::string>& toRemoveUnExpiredSubDirs) { std::vector<std::pair<std::string, uint64_t>> unexpiredUselessDirs; for (auto& item : fileList) { if (inUseSubDirNames.find(item.path) != inUseSubDirNames.end()) { continue; } if (item.lastModifyTime + expireTimeInSec >= currentTimestamp) { unexpiredUselessDirs.push_back(std::make_pair(item.path, item.lastModifyTime)); continue; } toRemoveExpiredSubDirs.push_back(item.path); } if (unexpiredUselessDirs.size() > maxUselessDirCount) { AUTIL_LOG(INFO, "trigger remove useless dir for useless dir count over [%lu]", maxUselessDirCount); std::sort(unexpiredUselessDirs.begin(), unexpiredUselessDirs.end(), [](const std::pair<std::string, uint64_t>& lft, const std::pair<std::string, uint64_t>& rht) { return lft.second < rht.second; }); size_t toRemoveDirCount = unexpiredUselessDirs.size() - maxUselessDirCount; for (size_t i = 0; i < toRemoveDirCount; i++) { toRemoveUnExpiredSubDirs.push_back(unexpiredUselessDirs[i].first); } } } void AgentServiceImpl::removeTargetSubDirs(const std::string& rootDir, std::vector<std::string>& toRemoveExpireSubDirs, std::vector<std::string>& toRemoveUnExpireSubDirs) { for (auto& dir : toRemoveExpireSubDirs) { string toRemoveDirPath = fslib::util::FileUtil::joinFilePath(rootDir, dir); AUTIL_LOG(INFO, "remove useless dir [%s] for over expired time [%ld]", toRemoveDirPath.c_str(), AGENT_USELESS_DIR_EXPIRE_TIME_IN_SEC); if (!fslib::util::FileUtil::remove(toRemoveDirPath)) { AUTIL_LOG(ERROR, "remove dir [%s] failed", toRemoveDirPath.c_str()); } } for (auto& dir : toRemoveUnExpireSubDirs) { string toRemoveDirPath = fslib::util::FileUtil::joinFilePath(rootDir, dir); AUTIL_LOG(INFO, "remove useless dir [%s] for over max keep count [%lu]", toRemoveDirPath.c_str(), AGENT_MAX_KEEP_USELESS_DIR_COUNT); if (!fslib::util::FileUtil::remove(toRemoveDirPath)) { AUTIL_LOG(ERROR, "remove dir [%s] failed", toRemoveDirPath.c_str()); } } } void AgentServiceImpl::updateWorkerStatus() { pid_t pid = waitpid(-1, NULL, WNOHANG); if (pid > 0) { AUTIL_LOG(INFO, "waitpid return[%d]", pid); } AUTIL_LOG(INFO, "begin update role status"); autil::ScopedLock lock(_processStatusLock); bool pidChange = false; for (auto iter = _processStatus.begin(); iter != _processStatus.end();) { auto pid = iter->second; auto targetIter = _targetProcessPlan.find(iter->first); if (targetIter == _targetProcessPlan.end()) { // should stop if (stopProcess(pid) || !existProcess(pid)) { pidChange = true; AUTIL_LOG(INFO, "stop useless worker[%s], procName [%s], pid[%d] success", iter->first.first.c_str(), iter->first.second.c_str(), pid); iter = _processStatus.erase(iter); } else { AUTIL_LOG(ERROR, "stop worker[%s], procName [%s], pid[%d] failed", iter->first.first.c_str(), iter->first.second.c_str(), pid); ++iter; } continue; } if (existProcess(pid)) { ++iter; continue; } pidChange = true; AUTIL_LOG(INFO, "process [%d] for worker [%s] procName [%s] is dead unexpectedly.", pid, iter->first.first.c_str(), iter->first.second.c_str()); INCREASE_QPS(_deadProcessQps); iter = _processStatus.erase(iter); } for (auto iter = _targetProcessPlan.begin(); iter != _targetProcessPlan.end(); iter++) { if (_processStatus.find(iter->first) != _processStatus.end()) { continue; } AUTIL_LOG(INFO, "find dead role [%s], procName [%s], try to start it.", iter->first.first.c_str(), iter->first.second.c_str()); _totalRestartCnt++; _latestRestartTimestamp = autil::TimeUtility::currentTimeInSeconds(); REPORT_METRIC(_totalRestartCntMetric, _totalRestartCnt); pid_t pid = startProcess(iter->first.first, iter->first.second, iter->second.cmd, iter->second.configPath, iter->second.envs); if (pid < 0) { AUTIL_LOG(ERROR, "try to start worker[%s], procName[%s] failed", iter->first.first.c_str(), iter->first.second.c_str()); INCREASE_QPS(_startProcessFailQps); continue; } pidChange = true; _processStatus[iter->first] = pid; } if (pidChange) { updatePidFile(); } auto currentTimeInSec = autil::TimeUtility::currentTimeInSeconds(); if (_latestRestartTimestamp > 0 && (currentTimeInSec - _latestRestartTimestamp) > AGENT_INNER_RESTART_COUNT_RESET_INTERVAL) { AUTIL_LOG(INFO, "reset total restart count, latest restart timestamp [%ld]", _latestRestartTimestamp); _totalRestartCnt = 0; _latestRestartTimestamp = -1; REPORT_METRIC(_totalRestartCntMetric, _totalRestartCnt); } } void AgentServiceImpl::updatePidFile() { string pidFile = fslib::util::FileUtil::joinFilePath(_workDir, "pids"); std::vector<ProcessData> pidDataVec; for (auto iter = _processStatus.begin(); iter != _processStatus.end(); ++iter) { pidDataVec.emplace_back(ProcessData::make(iter->first.first, iter->first.second, iter->second)); } std::string pidContent = autil::legacy::ToJsonString(pidDataVec, true); REPORT_METRIC(_subProcessCntMetric, _processStatus.size()); AUTIL_LOG(INFO, "write pid file[%s:%s]", pidFile.c_str(), pidContent.c_str()); auto errorCode = fslib::fs::FileSystem::writeFile(pidFile, pidContent); if (fslib::EC_OK != errorCode) { AUTIL_LOG(ERROR, "write pid file[%s:%s] fail, error[%d]", pidFile.c_str(), pidContent.c_str(), errorCode); } } void AgentServiceImpl::reuseCurrentProc(bool& bPidChanged) { string originPidFile = fslib::util::FileUtil::joinFilePath(_workDir, "pids"); string originPidContent; auto error = fslib::fs::FileSystem::readFile(originPidFile, originPidContent); if (fslib::EC_NOENT == error) { return; } if (fslib::EC_OK != error) { AUTIL_LOG(ERROR, "read file[%s] fail, error[%d]", originPidFile.c_str(), error); return; } AUTIL_LOG(INFO, "pids[%s] content[%s]", originPidFile.c_str(), originPidContent.c_str()); bool invalidFormat = false; std::vector<ProcessData> pidDataVec; try { autil::legacy::FromJsonString(pidDataVec, originPidContent); } catch (const std::exception& e) { AUTIL_LOG(ERROR, "From json string failed, content[%s], exception[%s]", originPidContent.c_str(), e.what()); invalidFormat = true; } catch (...) { AUTIL_LOG(ERROR, "From json string failed, content[%s]", originPidContent.c_str()); invalidFormat = true; } if (invalidFormat && !parseLegacyPidFile(originPidContent, pidDataVec)) { return; } for (const auto& pidData : pidDataVec) { if (!existProcess(pidData.pid)) { continue; } auto key = std::make_pair(pidData.roleName, pidData.procName); auto iter = _processStatus.find(key); if (iter == _processStatus.end()) { _processStatus[key] = pidData.pid; bPidChanged = true; AUTIL_LOG(INFO, "find exist role [%s] procName [%s]", pidData.roleName.c_str(), pidData.procName.c_str()); continue; } if (iter->second == pidData.pid) { continue; } bPidChanged = true; if (existProcess(iter->second)) { AUTIL_LOG(WARN, "stop duplicated legacy role [%s], pid [%d]", pidData.roleName.c_str(), pidData.pid); stopProcess(pidData.pid); } else { AUTIL_LOG(INFO, "use exist role [%s]", pidData.roleName.c_str()); iter->second = pidData.pid; } } } void AgentServiceImpl::getCurrentState(std::string& state) { ScopedLock lock(_lock); fillProtocolVersion(_current); fillCpuSpeed(_current); fillNetworkTraffic(_current); saveCurrent(_current, state); } bool AgentServiceImpl::hasFatalError() { ScopedLock lock(_lock); return _hasFatalError || _totalRestartCnt > AGENT_INNER_RESTART_COUNT_THRESHOLD; } std::string AgentServiceImpl::getBinaryPath() const { // for SimpleMasterSchedulerLocal auto binaryPath = autil::EnvUtil::getEnv("BINARY_PATH_FOR_BS_LOCAL"); if (!binaryPath.empty()) { return binaryPath; } char buffer[2048]; int ret = readlink("/proc/self/exe", buffer, sizeof(buffer) - 1); return fslib::util::FileUtil::getParentDir(std::string(buffer, ret)); } std::string AgentServiceImpl::getRoleWorkDir(const std::string& roleName, const std::string& procName) const { string reletivePath = "target_roles/" + roleName + "/" + procName; return fslib::util::FileUtil::joinFilePath(_workDir, reletivePath); } bool AgentServiceImpl::createWorkDir(const std::string& dir) { fslib::ErrorCode ec = fslib::fs::FileSystem::isExist(dir); if (fslib::EC_TRUE == ec) { return true; } if (fslib::EC_FALSE == ec) { ec = fslib::fs::FileSystem::mkDir(dir, true); if (ec != fslib::EC_OK) { AUTIL_LOG(ERROR, "create dir failed. dir:[%s], error:[%s]", dir.c_str(), fslib::fs::FileSystem::getErrorString(ec).c_str()); return false; } return true; } return false; } void AgentServiceImpl::prepareBinarySymLink(const std::string& workDir) { auto targetBinaryPath = fslib::util::FileUtil::joinFilePath(fslib::util::FileUtil::parentPath(workDir), "binary"); fslib::ErrorCode ec = fslib::fs::FileSystem::isExist(targetBinaryPath); if (fslib::EC_TRUE == ec && fslib::util::FileUtil::isLink(targetBinaryPath)) { if (!fslib::util::FileUtil::remove(targetBinaryPath)) { AUTIL_LOG(ERROR, "remove link dir [%s] failed", targetBinaryPath.c_str()); return; } } auto srcBinaryPath = fslib::util::FileUtil::joinFilePath(fslib::util::FileUtil::parentPath(_workDir), "binary"); ec = fslib::fs::FileSystem::isExist(srcBinaryPath); if (fslib::EC_TRUE != ec) { AUTIL_LOG(WARN, "src binary path [%s] not exist, ignore prepare sym-link.", srcBinaryPath.c_str()); return; } if (fslib::util::FileUtil::isLink(srcBinaryPath)) { string linkPath = fslib::util::FileUtil::readLink(srcBinaryPath); if (!linkPath.empty()) { srcBinaryPath = linkPath; } } if (indexlib::file_system::FslibWrapper::SymLink(srcBinaryPath, targetBinaryPath) != indexlib::file_system::FSEC_OK) { AUTIL_LOG(ERROR, "create sym-link from binary path [%s] to path [%s] fail.", srcBinaryPath.c_str(), targetBinaryPath.c_str()); return; } AUTIL_LOG(INFO, "create sym-link from binary path [%s] to path [%s] success.", srcBinaryPath.c_str(), targetBinaryPath.c_str()); } void AgentServiceImpl::prepareConfigSymLink(const std::string& workDir, const std::string& configPath) { if (configPath.empty()) { AUTIL_LOG(INFO, "no need prepare sym-link for config path"); return; } auto srcConfigRoot = fslib::util::FileUtil::joinFilePath(fslib::util::FileUtil::joinFilePath(_workDir, "config"), getLocalConfigDirName(configPath)); auto srcConfigPath = common::ConfigDownloader::getLocalConfigPath(configPath, srcConfigRoot); fslib::ErrorCode ec = fslib::fs::FileSystem::isExist(srcConfigPath); if (fslib::EC_TRUE != ec) { AUTIL_LOG(WARN, "latest config path [%s] not exist, ignore prepare sym-link.", srcConfigPath.c_str()); return; } auto targetConfigRoot = fslib::util::FileUtil::joinFilePath(workDir, "config"); ec = fslib::fs::FileSystem::isExist(targetConfigRoot); if (fslib::EC_FALSE == ec) { ec = fslib::fs::FileSystem::mkDir(targetConfigRoot, true); if (ec != fslib::EC_OK) { AUTIL_LOG(ERROR, "create dir failed. dir:[%s], error:[%s]", targetConfigRoot.c_str(), fslib::fs::FileSystem::getErrorString(ec).c_str()); return; } } auto targetConfigPath = common::ConfigDownloader::getLocalConfigPath(configPath, targetConfigRoot); ec = fslib::fs::FileSystem::isExist(targetConfigPath); if (fslib::EC_TRUE == ec) { AUTIL_LOG(INFO, "target config path [%s] already exist, ignore prepare sym-link.", targetConfigPath.c_str()); return; } if (indexlib::file_system::FslibWrapper::SymLink(srcConfigPath, targetConfigPath) != indexlib::file_system::FSEC_OK) { AUTIL_LOG(ERROR, "create sym-link from config path [%s] to path [%s] fail.", srcConfigPath.c_str(), targetConfigPath.c_str()); return; } AUTIL_LOG(INFO, "create sym-link from config path [%s] to path [%s] success.", srcConfigPath.c_str(), targetConfigPath.c_str()); } std::vector<std::string> AgentServiceImpl::setEnvForSubProcess(const std::string& workDir, const std::vector<hippo::PairType>& envs) { std::map<std::string, std::string> targetEnvMap = _globalEnvironMap; autil::EnvUtil::setEnv("HIPPO_PROC_WORKDIR", workDir, true); AUTIL_LOG(INFO, "set env [HIPPO_PROC_WORKDIR] = %s", workDir.c_str()); targetEnvMap["HIPPO_PROC_WORKDIR"] = workDir; for (auto& env : envs) { auto normalizeEnvValue = normalizeEnvArguments(targetEnvMap, env.second); auto rValue = autil::EnvUtil::envReplace(normalizeEnvValue); autil::EnvUtil::setEnv(env.first, rValue); AUTIL_LOG(INFO, "set env [%s] = %s", env.first.c_str(), rValue.c_str()); targetEnvMap[env.first] = rValue; } AUTIL_LOG(INFO, "set env [BUILD_SERVICE_AGENT_ROLE_NAME] = %s", _agentRoleName.c_str()); targetEnvMap["BUILD_SERVICE_AGENT_ROLE_NAME"] = _agentRoleName; std::vector<std::string> envStrVec; for (auto& item : targetEnvMap) { envStrVec.push_back(item.first + "=" + item.second); } return envStrVec; } bool AgentServiceImpl::createProcessPlan(const proto::AgentTarget& target, std::map<ProcessKey, ProcessInfo>& plan) const { map<std::string, std::string> targetRoleConfigMap; map<std::string, std::vector<hippo::ProcessInfo>> targetRoleInfoMap; for (size_t i = 0; i < target.targetroles_size(); i++) { const string& roleName = target.targetroles(i).rolename(); const string& procInfoStr = target.targetroles(i).processinfo(); string configPath; if (target.has_configpath()) { configPath = target.configpath(); } if (target.targetroles(i).has_configpath()) { configPath = target.targetroles(i).configpath(); } if (!configPath.empty()) { targetRoleConfigMap[roleName] = configPath; } std::vector<hippo::ProcessInfo>& procInfos = targetRoleInfoMap[roleName]; try { autil::legacy::FromJsonString(procInfos, procInfoStr); } catch (const std::exception& e) { AUTIL_LOG(ERROR, "From json string failed, content[%s], exception[%s]", procInfoStr.c_str(), e.what()); return false; } catch (...) { AUTIL_LOG(ERROR, "From json string failed, content[%s]", procInfoStr.c_str()); return false; } } assert(plan.empty()); set<string> keys = {"-z", "-r", "-s", "-m", "-a"}; for (const auto& item : targetRoleInfoMap) { const string& roleName = item.first; const auto& processInfos = item.second; auto cIter = targetRoleConfigMap.find(roleName); for (size_t i = 0; i < processInfos.size(); i++) { const hippo::ProcessInfo& processInfo = processInfos[i]; map<string, string> newArgs; if (processInfo.name == "build_service_worker") { newArgs = _procArgMap; for (size_t j = 0; j < processInfo.args.size(); j++) { if (keys.find(processInfo.args[j].first) != keys.end()) { newArgs[processInfo.args[j].first] = processInfo.args[j].second; } } newArgs["-w"] = getRoleWorkDir(roleName, processInfo.name); } else { for (size_t j = 0; j < processInfo.args.size(); j++) { newArgs[processInfo.args[j].first] = processInfo.args[j].second; } } string args; for (auto& item : newArgs) { args += " "; args += item.first; args += " "; args += item.second; } auto key = std::make_pair(roleName, processInfo.name); ProcessInfo pInfo; pInfo.cmd = _binaryPath + "/" + processInfo.cmd + args; pInfo.envs = processInfo.envs; if (cIter != targetRoleConfigMap.end()) { pInfo.configPath = cIter->second; } auto iter = plan.find(key); if (iter != plan.end()) { AUTIL_LOG(ERROR, "Duplicated procName [%s] for role [%s] is found, " "will ignore current process [%s]", processInfo.name.c_str(), roleName.c_str(), autil::legacy::ToJsonString(processInfo, true).c_str()); continue; } plan[key] = pInfo; } } return true; } bool AgentServiceImpl::parseLegacyPidFile(const std::string& content, std::vector<ProcessData>& pidDatas) { const vector<string>& originPids = autil::StringUtil::split(content, ","); for (const string& pidStr : originPids) { int pid; if (!autil::StringUtil::fromString(pidStr, pid)) { return false; } if (getpid() == pid) { continue; } if (!existProcess(pid)) { continue; } string srcFile = "/proc/" + pidStr + "/cmdline"; string processCmd; auto error = fslib::fs::FileSystem::readFile(srcFile, processCmd); if (fslib::EC_OK != error) { continue; } string roleName; AgentServiceImpl::parseCmdline(processCmd, roleName); if (roleName.empty()) { continue; } pidDatas.emplace_back(ProcessData::make(roleName, "build_service_worker", pid)); } return true; } void AgentServiceImpl::parseCmdline(std::string& cmdline, std::string& roleName) { for (int i = 0; i < cmdline.size(); i++) { if ('\0' == cmdline[i]) { cmdline[i] = ' '; } } const vector<string>& cmdVectors = autil::StringUtil::split(cmdline, " "); for (int i = 0; i < cmdVectors.size(); i++) { if (cmdVectors[i] == "-r") { if (i + 1 < cmdVectors.size()) { roleName = cmdVectors[++i]; // increase i here break; } } } } void AgentServiceImpl::prepareGlobalEnvironMap() { string srcFile = "/proc/" + autil::StringUtil::toString(getpid()) + "/environ"; string processEnv; fslib::fs::FileSystem::readFile(srcFile, processEnv); for (int i = 0; i < processEnv.size(); i++) { if ('\0' == processEnv[i]) { processEnv[i] = '\t'; } } const vector<string>& envStrVec = autil::StringUtil::split(processEnv, "\t"); for (auto envStr : envStrVec) { auto pos = envStr.find("="); if (pos == string::npos) { AUTIL_LOG(ERROR, "invalid env str [%s]", envStr.c_str()); continue; } _globalEnvironMap[envStr.substr(0, pos)] = envStr.substr(pos + 1); } } char** AgentServiceImpl::makeArgsList(const std::vector<std::string>& args) { if (args.empty()) { return nullptr; } size_t listSize = args.size(); char** res = new char*[listSize + 1]; for (size_t i = 0; i < listSize; ++i) { size_t strSize = args[i].size(); res[i] = new char[strSize + 1]; strncpy(res[i], args[i].data(), strSize); res[i][strSize] = '\0'; } res[listSize] = nullptr; return res; } std::string AgentServiceImpl::normalizeEnvArguments(const std::map<std::string, std::string>& envMap, const std::string& rawString) { if (rawString.find("$") == std::string::npos) { return rawString; } std::string normalizeStr = rawString; // use reverse iterator to avoid prefix match { HIPPO_APP & HIPPO_APP_INST_ROOT } for (auto iter = envMap.rbegin(); iter != envMap.rend(); iter++) { std::string oldStr = "$" + iter->first; std::string newStr = "${" + iter->first + "}"; autil::StringUtil::replaceAll(normalizeStr, oldStr, newStr); } if (rawString != normalizeStr) { AUTIL_LOG(INFO, "normalize env string from [%s] to [%s]", rawString.c_str(), normalizeStr.c_str()); } return normalizeStr; } std::string AgentServiceImpl::getLocalConfigDirName(const std::string& configPath) { uint32_t configPathCrc = autil::CRC32C::Value(configPath.c_str(), configPath.size()); return autil::StringUtil::toString(configPathCrc); } }} // namespace build_service::worker