lib/core/CDetachedProcessSpawner.cc (222 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CDetachedProcessSpawner.h> #include <core/CCondition.h> #include <core/CLogger.h> #include <core/CMutex.h> #include <core/CScopedLock.h> #include <core/CThread.h> #include <algorithm> #include <set> #include <errno.h> #include <fcntl.h> #include <signal.h> #include <spawn.h> #include <stdlib.h> #include <string.h> #include <sys/resource.h> #include <sys/stat.h> #include <sys/wait.h> #include <unistd.h> // environ is a global variable from the C runtime library extern char** environ; namespace { //! Maximum number of newly opened files between calls to setupFileActions(). const int MAX_NEW_OPEN_FILES{10}; //! Attempt to close all file descriptors except the standard ones. The //! standard file descriptors will be reopened on /dev/null in the spawned //! process. Returns false and sets errno if the actions cannot be initialised //! at all, but other errors are ignored. bool setupFileActions(posix_spawn_file_actions_t* fileActions, int& maxFdHint) { if (::posix_spawn_file_actions_init(fileActions) != 0) { return false; } struct rlimit rlim; ::memset(&rlim, 0, sizeof(struct rlimit)); if (::getrlimit(RLIMIT_NOFILE, &rlim) != 0) { rlim.rlim_cur = 36; // POSIX default } // Assume only a handful of new files have been opened since the last time // this function was called. Doing this means we learn the practical limit // on the number of open files, which will be a lot less than the enforced // limit, and avoids making masses of expensive fcntl() calls. int maxFdToTest{std::min(static_cast<int>(rlim.rlim_cur), maxFdHint + MAX_NEW_OPEN_FILES)}; for (int fd = 0; fd <= maxFdToTest; ++fd) { if (fd == STDIN_FILENO) { ::posix_spawn_file_actions_addopen(fileActions, fd, "/dev/null", O_RDONLY, S_IRUSR); maxFdHint = fd; } else if (fd == STDOUT_FILENO || fd == STDERR_FILENO) { ::posix_spawn_file_actions_addopen(fileActions, fd, "/dev/null", O_WRONLY, S_IWUSR); maxFdHint = fd; } else { // Close other files that are open. There is a race condition here, // in that files could be opened or closed between this code running // and the posix_spawn() function being called. However, this would // violate the restrictions stated in the contract detailed in the // Doxygen description of this class. if (::fcntl(fd, F_GETFL) != -1) { ::posix_spawn_file_actions_addclose(fileActions, fd); maxFdHint = fd; } } } return true; } } namespace ml { namespace core { namespace detail { class CTrackerThread : public CThread { public: using TPidSet = std::set<CProcess::TPid>; public: CTrackerThread() : m_Shutdown(false), m_Condition(m_Mutex) {} //! Mutex is accessible so the code outside the class can avoid race //! conditions. CMutex& mutex() { return m_Mutex; } //! Add a PID to track. void addPid(CProcess::TPid pid) { CScopedLock lock(m_Mutex); m_Pids.insert(pid); m_Condition.signal(); } bool terminatePid(CProcess::TPid pid) { if (!this->havePid(pid)) { LOG_ERROR(<< "Will not attempt to kill process " << pid << ": not a child process"); return false; } if (::kill(pid, SIGTERM) == -1) { // Don't log an error if the process exited normally in between // checking whether it was our child process and killing it if (errno != ESRCH) { LOG_ERROR(<< "Failed to kill process " << pid << ": " << ::strerror(errno)); } else { // But log at debug in case there's a bug in this area LOG_DEBUG(<< "No such process while trying to kill PID " << pid); } return false; } return true; } bool havePid(CProcess::TPid pid) const { if (pid <= 0) { return false; } CScopedLock lock(m_Mutex); // Do an extra cycle of waiting for zombies, so we give the most // up-to-date answer possible const_cast<CTrackerThread*>(this)->checkForDeadChildren(); return m_Pids.find(pid) != m_Pids.end(); } protected: void run() override { CScopedLock lock(m_Mutex); while (!m_Shutdown) { // Reap zombies every 50ms if child processes are running, // otherwise wait for a child process to start. if (m_Pids.empty()) { m_Condition.wait(); } else { m_Condition.wait(50); } this->checkForDeadChildren(); } } void shutdown() override { LOG_DEBUG(<< "Shutting down spawned process tracker thread"); CScopedLock lock(m_Mutex); m_Shutdown = true; m_Condition.signal(); } private: //! Reap zombie child processes and adjust the set of live child PIDs //! accordingly. MUST be called with m_Mutex locked. void checkForDeadChildren() { int status = 0; for (;;) { CProcess::TPid pid = ::waitpid(-1, &status, WNOHANG); // 0 means there are child processes but none have died if (pid == 0) { break; } // -1 means error if (pid == -1) { if (errno != EINTR) { break; } } else { if (WIFSIGNALED(status)) { int signal = WTERMSIG(status); if (signal == SIGTERM) { // We expect this when a job is force-closed, so log // at a lower level LOG_INFO(<< "Child process with PID " << pid << " was terminated by signal " << signal); } else if (signal == SIGKILL) { // This should never happen if the system is working // normally - possible reasons are the Linux OOM // killer or manual intervention. The latter is highly unlikely // if running in the cloud. LOG_ERROR(<< "Child process with PID " << pid << " was terminated by signal 9 (SIGKILL)." << " This is likely due to the OOM killer." << " Please check system logs for more details."); } else { // This should never happen if the system is working // normally - possible reasons are bugs that cause // access violations or manual intervention. The latter is highly unlikely // if running in the cloud. LOG_ERROR(<< "Child process with PID " << pid << " was terminated by signal " << signal << " Please check system logs for more details."); } } else { int exitCode = WEXITSTATUS(status); if (exitCode == 0) { // This is the happy case LOG_DEBUG(<< "Child process with PID " << pid << " has exited"); } else { LOG_WARN(<< "Child process with PID " << pid << " has exited with exit code " << exitCode); } } m_Pids.erase(pid); } } } private: bool m_Shutdown; TPidSet m_Pids; mutable CMutex m_Mutex; CCondition m_Condition; }; } CDetachedProcessSpawner::CDetachedProcessSpawner(const TStrVec& permittedProcessPaths) : m_PermittedProcessPaths(permittedProcessPaths), m_TrackerThread(std::make_shared<detail::CTrackerThread>()) { if (m_TrackerThread->start() == false) { LOG_ERROR(<< "Failed to start spawned process tracker thread"); } } CDetachedProcessSpawner::~CDetachedProcessSpawner() { if (m_TrackerThread->stop() == false) { LOG_ERROR(<< "Failed to stop spawned process tracker thread"); } } bool CDetachedProcessSpawner::spawn(const std::string& processPath, const TStrVec& args) { CProcess::TPid dummy(0); return this->spawn(processPath, args, dummy); } bool CDetachedProcessSpawner::spawn(const std::string& processPath, const TStrVec& args, CProcess::TPid& childPid) { if (std::find(m_PermittedProcessPaths.begin(), m_PermittedProcessPaths.end(), processPath) == m_PermittedProcessPaths.end()) { LOG_ERROR(<< "Spawning process '" << processPath << "' is not permitted"); return false; } if (::access(processPath.c_str(), X_OK) != 0) { LOG_ERROR(<< "Cannot execute '" << processPath << "': " << ::strerror(errno)); return false; } using TCharPVec = std::vector<char*>; // Size of argv is two bigger than the number of arguments because: // 1) We add the program name at the beginning // 2) The list of arguments must be terminated by a NULL pointer TCharPVec argv; argv.reserve(args.size() + 2); // These const_casts may cause const data to get modified BUT only in the // child post-fork, so this won't corrupt parent process data argv.push_back(const_cast<char*>(processPath.c_str())); for (size_t index = 0; index < args.size(); ++index) { argv.push_back(const_cast<char*>(args[index].c_str())); } argv.push_back(static_cast<char*>(nullptr)); posix_spawn_file_actions_t fileActions; if (setupFileActions(&fileActions, m_MaxObservedFd) == false) { LOG_ERROR(<< "Failed to set up file actions prior to spawn of '" << processPath << "': " << ::strerror(errno)); return false; } posix_spawnattr_t spawnAttributes; if (::posix_spawnattr_init(&spawnAttributes) != 0) { LOG_ERROR(<< "Failed to set up spawn attributes prior to spawn of '" << processPath << "': " << ::strerror(errno)); return false; } ::posix_spawnattr_setflags(&spawnAttributes, POSIX_SPAWN_SETPGROUP); { // Hold the tracker thread mutex until the PID is added to the tracker // to avoid a race condition if the process is started but dies really // quickly CScopedLock lock(m_TrackerThread->mutex()); int err(::posix_spawn(&childPid, processPath.c_str(), &fileActions, &spawnAttributes, &argv[0], environ)); ::posix_spawn_file_actions_destroy(&fileActions); ::posix_spawnattr_destroy(&spawnAttributes); if (err != 0) { LOG_ERROR(<< "Failed to spawn '" << processPath << "': " << ::strerror(err)); return false; } m_TrackerThread->addPid(childPid); } LOG_DEBUG(<< "Spawned '" << processPath << "' with PID " << childPid); return true; } bool CDetachedProcessSpawner::terminateChild(CProcess::TPid pid) { return m_TrackerThread->terminatePid(pid); } bool CDetachedProcessSpawner::hasChild(CProcess::TPid pid) const { return m_TrackerThread->havePid(pid); } } }