remote/windows/PipeTransportServer.cpp (265 lines of code) (raw):

#include "PipeTransportServer.h" #include "PipeTransport.h" #include <cstring> #include "thrift/thrift-config.h" #include "thrift/TNonCopyable.h" #include <AccCtrl.h> #include <AclAPI.h> #include <sddl.h> #include "thrift/windows/OverlappedSubmissionThread.h" #include "thrift/windows/Sync.h" using namespace apache::thrift::transport; using namespace apache::thrift; using std::shared_ptr; namespace { // Windows - set security to allow non-elevated apps // to access pipes created by elevated apps. // Full access to everyone const std::string DEFAULT_PIPE_SECURITY{"D:(A;;FA;;;WD)"}; } class WindowsPipeServerImpl : apache::thrift::TNonCopyable { public: WindowsPipeServerImpl() {} virtual ~WindowsPipeServerImpl() {} virtual void interrupt() = 0; virtual std::shared_ptr<TTransport> acceptImpl() = 0; virtual HANDLE getPipeHandle() = 0; virtual HANDLE getWrtPipeHandle() = 0; virtual HANDLE getClientRdPipeHandle() = 0; virtual HANDLE getClientWrtPipeHandle() = 0; virtual HANDLE getNativeWaitHandle() { return nullptr; } }; class WindowsNamedPipeServer : public WindowsPipeServerImpl { public: WindowsNamedPipeServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections, const std::string& securityDescriptor) : stopping_(false), pipename_(pipename), bufsize_(bufsize), maxconns_(maxconnections), securityDescriptor_(securityDescriptor) { connectOverlap_.action = TOverlappedWorkItem::CONNECT; cancelOverlap_.action = TOverlappedWorkItem::CANCELIO; TAutoCrit lock(pipe_protect_); initiateNamedConnect(lock); } virtual ~WindowsNamedPipeServer() {} virtual void interrupt() { TAutoCrit lock(pipe_protect_); cached_client_.reset(); if (Pipe_.h != INVALID_HANDLE_VALUE) { stopping_ = true; cancelOverlap_.h = Pipe_.h; // This should wake up GetOverlappedResult thread_->addWorkItem(&cancelOverlap_); } } virtual std::shared_ptr<TTransport> acceptImpl(); virtual HANDLE getPipeHandle() { return Pipe_.h; } virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; } virtual HANDLE getClientRdPipeHandle() { return INVALID_HANDLE_VALUE; } virtual HANDLE getClientWrtPipeHandle() { return INVALID_HANDLE_VALUE; } virtual HANDLE getNativeWaitHandle() { return listen_event_.h; } private: bool createNamedPipe(const TAutoCrit &lockProof); void initiateNamedConnect(const TAutoCrit &lockProof); TAutoOverlapThread thread_; TOverlappedWorkItem connectOverlap_; TOverlappedWorkItem cancelOverlap_; bool stopping_; std::string pipename_; std::string securityDescriptor_; uint32_t bufsize_; uint32_t maxconns_; TManualResetEvent listen_event_; TCriticalSection pipe_protect_; // only read or write these variables underneath a locked pipe_protect_ std::shared_ptr<PipeTransport> cached_client_; TAutoHandle Pipe_; }; HANDLE PipeTransportServer::getNativeWaitHandle() { if (impl_) return impl_->getNativeWaitHandle(); return nullptr; } //---- Constructors ---- PipeTransportServer::PipeTransportServer(const std::string& pipename, uint32_t bufsize) : bufsize_(bufsize), isAnonymous_(false) { setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT); setPipename(pipename); setSecurityDescriptor(DEFAULT_PIPE_SECURITY); } PipeTransportServer::PipeTransportServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections) : bufsize_(bufsize), isAnonymous_(false) { setMaxConnections(maxconnections); setPipename(pipename); setSecurityDescriptor(DEFAULT_PIPE_SECURITY); } PipeTransportServer::PipeTransportServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections, const std::string& securityDescriptor) : bufsize_(bufsize), isAnonymous_(false) { setMaxConnections(maxconnections); setPipename(pipename); setSecurityDescriptor(securityDescriptor); } PipeTransportServer::PipeTransportServer(const std::string& pipename) : bufsize_(1024), isAnonymous_(false) { setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT); setPipename(pipename); setSecurityDescriptor(DEFAULT_PIPE_SECURITY); } //---- Destructor ---- PipeTransportServer::~PipeTransportServer() {} bool PipeTransportServer::isOpen() const { return (impl_->getPipeHandle() != INVALID_HANDLE_VALUE); } //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- void PipeTransportServer::listen() { if (isAnonymous_) return; impl_.reset(new WindowsNamedPipeServer(pipename_, bufsize_, maxconns_, securityDescriptor_)); } shared_ptr<TTransport> PipeTransportServer::acceptImpl() { return impl_->acceptImpl(); } void WindowsNamedPipeServer::initiateNamedConnect(const TAutoCrit &lockProof) { if (stopping_) return; if (!createNamedPipe(lockProof)) { throw TTransportException(TTransportException::NOT_OPEN, "PipeTransportServer CreateNamedPipe failed"); } // The prior connection has been handled, so close the gate ResetEvent(listen_event_.h); connectOverlap_.reset(nullptr, 0, listen_event_.h); connectOverlap_.h = Pipe_.h; thread_->addWorkItem(&connectOverlap_); // Wait for the client to connect; if it succeeds, the // function returns a nonzero value. If the function returns // zero, GetLastError should return ERROR_PIPE_CONNECTED. if (connectOverlap_.success) { cached_client_.reset(new PipeTransport(Pipe_, pipename_)); // make sure people know that a connection is ready SetEvent(listen_event_.h); return; } DWORD dwErr = connectOverlap_.last_error; switch (dwErr) { case ERROR_PIPE_CONNECTED: cached_client_.reset(new PipeTransport(Pipe_, pipename_)); // make sure people know that a connection is ready SetEvent(listen_event_.h); return; case ERROR_IO_PENDING: return; // acceptImpl will do the appropriate WaitForMultipleObjects default: throw TTransportException(TTransportException::NOT_OPEN, "PipeTransportServer ConnectNamedPipe failed, err: " + TOutput::strerror_s(dwErr)); } } shared_ptr<TTransport> WindowsNamedPipeServer::acceptImpl() { { TAutoCrit lock(pipe_protect_); if (cached_client_.get() != nullptr) { shared_ptr<PipeTransport> client; // zero out cached_client, since we are about to return it. client.swap(cached_client_); // kick off the next connection before returning initiateNamedConnect(lock); return client; // success! } } if (Pipe_.h == INVALID_HANDLE_VALUE) { throw TTransportException(TTransportException::NOT_OPEN, "WindowsNamedPipeServer: someone called accept on a closed pipe server."); } DWORD dwDummy = 0; // For the most part, Pipe_ should be protected with pipe_protect_. We can't // reasonably do that here though without breaking interruptability. However, // this should be safe, though I'm not happy about it. We only need to ensure // that no one writes / modifies Pipe_.h while we are reading it. Well, the // only two things that should be modifying Pipe_ are acceptImpl, the // functions it calls, and the destructor. Those things shouldn't be run // concurrently anyway. So this call is 'really' just a read that may happen // concurrently with interrupt, and that should be fine. if (GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) { TAutoCrit lock(pipe_protect_); shared_ptr<PipeTransport> client; try { client.reset(new PipeTransport(Pipe_, pipename_)); } catch (TTransportException& ttx) { if (ttx.getType() == TTransportException::INTERRUPTED) { throw; } // kick off the next connection before throwing initiateNamedConnect(lock); throw TTransportException(TTransportException::CLIENT_DISCONNECT, ttx.what()); } // kick off the next connection before returning initiateNamedConnect(lock); return client; // success! } // if we got here, then we are in an error / shutdown case DWORD gle = GetLastError(); // save error before doing cleanup if(gle == ERROR_OPERATION_ABORTED) { TAutoCrit lock(pipe_protect_); // Needed to insure concurrent thread to be out of interrupt. throw TTransportException(TTransportException::INTERRUPTED, "PipeTransportServer: server interrupted, err: " + TOutput::strerror_s(gle)); } throw TTransportException(TTransportException::NOT_OPEN, "PipeTransportServer: client connection failed, err: " + TOutput::strerror_s(gle)); } void PipeTransportServer::interrupt() { if (impl_) impl_->interrupt(); } void PipeTransportServer::close() { impl_.reset(); } bool WindowsNamedPipeServer::createNamedPipe(const TAutoCrit& /*lockProof*/) { PSECURITY_DESCRIPTOR psd = nullptr; ULONG size = 0; if (!ConvertStringSecurityDescriptorToSecurityDescriptorA(securityDescriptor_.c_str(), SDDL_REVISION_1, &psd, &size)) { DWORD lastError = GetLastError(); throw TTransportException(TTransportException::NOT_OPEN, "PipeTransportServer::ConvertStringSecurityDescriptorToSecurityDescriptorA() failed, err:" + TOutput::strerror_s(lastError)); } SECURITY_ATTRIBUTES sa; sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.lpSecurityDescriptor = psd; sa.bInheritHandle = FALSE; // Create an instance of the named pipe TAutoHandle hPipe(CreateNamedPipeA(pipename_.c_str(), // pipe name PIPE_ACCESS_DUPLEX | // read/write access FILE_FLAG_OVERLAPPED, // async mode PIPE_TYPE_BYTE | // byte type pipe PIPE_READMODE_BYTE, // byte read mode maxconns_, // max. instances bufsize_, // output buffer size bufsize_, // input buffer size 0, // client time-out &sa)); // security attributes auto lastError = GetLastError(); if (psd) LocalFree(psd); if (hPipe.h == INVALID_HANDLE_VALUE) { Pipe_.reset(); throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed, err: " + TOutput::strerror_s(lastError)); } Pipe_.reset(hPipe.release()); return true; } //--------------------------------------------------------- // Accessors //--------------------------------------------------------- std::string PipeTransportServer::getPipename() { return pipename_; } void PipeTransportServer::setPipename(const std::string& pipename) { if (pipename.find("\\\\") == std::string::npos) pipename_ = "\\\\.\\pipe\\" + pipename; else pipename_ = pipename; } int PipeTransportServer::getBufferSize() { return bufsize_; } void PipeTransportServer::setBufferSize(int bufsize) { bufsize_ = bufsize; } HANDLE PipeTransportServer::getPipeHandle() { return impl_ ? impl_->getPipeHandle() : INVALID_HANDLE_VALUE; } HANDLE PipeTransportServer::getWrtPipeHandle() { return impl_ ? impl_->getWrtPipeHandle() : INVALID_HANDLE_VALUE; } HANDLE PipeTransportServer::getClientRdPipeHandle() { return impl_ ? impl_->getClientRdPipeHandle() : INVALID_HANDLE_VALUE; } HANDLE PipeTransportServer::getClientWrtPipeHandle() { return impl_ ? impl_->getClientWrtPipeHandle() : INVALID_HANDLE_VALUE; } bool PipeTransportServer::getAnonymous() { return isAnonymous_; } void PipeTransportServer::setAnonymous(bool anon) { isAnonymous_ = anon; } void PipeTransportServer::setSecurityDescriptor(const std::string& securityDescriptor) { securityDescriptor_ = securityDescriptor; } void PipeTransportServer::setMaxConnections(uint32_t maxconnections) { if (maxconnections == 0) maxconns_ = 1; else if (maxconnections > PIPE_UNLIMITED_INSTANCES) maxconns_ = PIPE_UNLIMITED_INSTANCES; else maxconns_ = maxconnections; }