lib/core/CDetachedProcessSpawner_Windows.cc (151 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CDetachedProcessSpawner.h>
#include <core/CCondition.h>
#include <core/CLogger.h>
#include <core/CMutex.h>
#include <core/CScopedLock.h>
#include <core/CShellArgQuoter.h>
#include <core/CThread.h>
#include <core/CWindowsError.h>
#include <core/WindowsSafe.h>
#include <map>
namespace ml {
namespace core {
namespace detail {
class CTrackerThread : public CThread {
public:
using TPidHandleMap = std::map<CProcess::TPid, HANDLE>;
public:
CTrackerThread() : m_Shutdown(false), m_Condition(m_Mutex) {}
virtual ~CTrackerThread() {
// Close the handles to any child processes that outlived us
CScopedLock lock(m_Mutex);
for (const auto& entry : m_Pids) {
CloseHandle(entry.second);
}
}
//! Mutex is accessible so the code outside the class can avoid race
//! conditions.
CMutex& mutex() { return m_Mutex; }
//! Add a PID to track, together with its corresponding process handle.
void addPid(CProcess::TPid pid, HANDLE processHandle) {
CScopedLock lock(m_Mutex);
m_Pids.insert({pid, processHandle});
m_Condition.signal();
}
bool terminatePid(CProcess::TPid pid) {
HANDLE handle = this->handleForPid(pid);
if (handle == INVALID_HANDLE_VALUE) {
LOG_ERROR(<< "Will not attempt to kill process " << pid << ": not a child process");
return false;
}
UINT exitCode = 0;
if (TerminateProcess(handle, exitCode) == FALSE) {
LOG_ERROR(<< "Failed to kill process " << pid << ": " << CWindowsError());
return false;
}
return true;
}
//! Given a process ID, return the corresponding process handle.
HANDLE handleForPid(CProcess::TPid pid) const {
if (pid == 0) {
return INVALID_HANDLE_VALUE;
}
CScopedLock lock(m_Mutex);
// Do an extra cycle of waiting for zombies, so we give the most
// up-to-date answer possible
const_cast<CTrackerThread*>(this)->checkForDeadChildren();
auto iter = m_Pids.find(pid);
return iter == m_Pids.end() ? INVALID_HANDLE_VALUE : iter->second;
}
protected:
virtual void run() {
CScopedLock lock(m_Mutex);
while (!m_Shutdown) {
// Reap zombies every 50ms if child processes are running,
// otherwise wait for a child process to start.
if (m_Pids.empty()) {
m_Condition.wait();
} else {
m_Condition.wait(50);
}
this->checkForDeadChildren();
}
}
virtual void shutdown() {
LOG_DEBUG(<< "Shutting down spawned process tracker thread");
CScopedLock lock(m_Mutex);
m_Shutdown = true;
m_Condition.signal();
}
private:
//! Reap zombie child processes and adjust the set of live child PIDs
//! accordingly. MUST be called with m_Mutex locked.
void checkForDeadChildren() {
auto iter = m_Pids.begin();
while (iter != m_Pids.end()) {
// The reason for using WaitForSingleObject() here instead of
// WaitForMultipleObjects() (which would avoid the need to wait
// on a condition variable above) is that the latter function
// can only wait for 64 objects simultaneously. We could easily
// have more child processes than this, so it would lead to code
// complexity and headaches getting test coverage to use
// WaitForMultipleObjects().
HANDLE processHandle = iter->second;
if (WaitForSingleObject(processHandle, 0) == WAIT_OBJECT_0) {
CloseHandle(processHandle);
iter = m_Pids.erase(iter);
} else {
++iter;
}
}
}
private:
bool m_Shutdown;
TPidHandleMap m_Pids;
mutable CMutex m_Mutex;
CCondition m_Condition;
};
}
CDetachedProcessSpawner::CDetachedProcessSpawner(const TStrVec& permittedProcessPaths)
: m_PermittedProcessPaths(permittedProcessPaths),
m_TrackerThread(std::make_shared<detail::CTrackerThread>()) {
if (m_TrackerThread->start() == false) {
LOG_ERROR(<< "Failed to start spawned process tracker thread");
}
}
CDetachedProcessSpawner::~CDetachedProcessSpawner() {
if (m_TrackerThread->stop() == false) {
LOG_ERROR(<< "Failed to stop spawned process tracker thread");
}
}
bool CDetachedProcessSpawner::spawn(const std::string& processPath, const TStrVec& args) {
CProcess::TPid dummy(0);
return this->spawn(processPath, args, dummy);
}
bool CDetachedProcessSpawner::spawn(const std::string& processPath,
const TStrVec& args,
CProcess::TPid& childPid) {
if (std::find(m_PermittedProcessPaths.begin(), m_PermittedProcessPaths.end(),
processPath) == m_PermittedProcessPaths.end()) {
LOG_ERROR(<< "Spawning process '" << processPath << "' is not permitted");
return false;
}
bool processPathHasExeExt(processPath.length() > 4 &&
processPath.compare(processPath.length() - 4, 4, ".exe") == 0);
// Windows takes command lines as a single string
std::string cmdLine(CShellArgQuoter::quote(processPath));
for (size_t index = 0; index < args.size(); ++index) {
cmdLine += ' ';
cmdLine += CShellArgQuoter::quote(args[index]);
}
STARTUPINFO startupInfo;
::memset(&startupInfo, 0, sizeof(STARTUPINFO));
startupInfo.cb = sizeof(STARTUPINFO);
PROCESS_INFORMATION processInformation;
::memset(&processInformation, 0, sizeof(PROCESS_INFORMATION));
{
// Hold the tracker thread mutex until the PID is added to the tracker
// to avoid a race condition if the process is started but dies really
// quickly
CScopedLock lock(m_TrackerThread->mutex());
if (CreateProcess(
(processPathHasExeExt ? processPath : processPath + ".exe").c_str(),
const_cast<char*>(cmdLine.c_str()), 0, 0, FALSE,
// The CREATE_NO_WINDOW flag is used instead of
// DETACHED_PROCESS, as Windows does not create the file handles
// that underlie stdin, stdout and stderr if a process has no
// knowledge of any console. With CREATE_NO_WINDOW the process
// will not initially be attached to any console, but has the
// option to attach to its parent process's console later on,
// and this means the three standard file handles are created.
// None of this would be a problem if we redirected stderr using
// freopen(), but instead we redirect the underlying OS level
// file handles so that we can revert the redirection.
CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW, 0, 0, &startupInfo,
&processInformation) == FALSE) {
LOG_ERROR(<< "Failed to spawn '" << processPath << "': " << CWindowsError());
return false;
}
childPid = GetProcessId(processInformation.hProcess);
m_TrackerThread->addPid(childPid, processInformation.hProcess);
}
LOG_DEBUG(<< "Spawned '" << processPath << "' with PID " << childPid);
CloseHandle(processInformation.hThread);
return true;
}
bool CDetachedProcessSpawner::terminateChild(CProcess::TPid pid) {
return m_TrackerThread->terminatePid(pid);
}
bool CDetachedProcessSpawner::hasChild(CProcess::TPid pid) const {
return m_TrackerThread->handleForPid(pid) != INVALID_HANDLE_VALUE;
}
}
}