remote/windows/PipeTransport.cpp (183 lines of code) (raw):

#include "PipeTransport.h" #include "thrift/transport/TTransportException.h" #include "thrift/windows/Sync.h" uint32_t pseudo_sync_read(const std::string & pipename_, HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len); void pseudo_sync_write(const std::string & pipename_, HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len); class WindowsPipeImpl : apache::thrift::TNonCopyable { public: WindowsPipeImpl() {} virtual ~WindowsPipeImpl() {} virtual uint32_t read(uint8_t* buf, uint32_t len) = 0; virtual void write(const uint8_t* buf, uint32_t len) = 0; virtual HANDLE getPipeHandle() = 0; // doubles as the read handle for anon pipe virtual void setPipeHandle(HANDLE pipehandle) = 0; virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; } virtual void setWrtPipeHandle(HANDLE) {} virtual bool isBufferedDataAvailable() { return false; } virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; } }; class WindowsNamedPipeImpl : public WindowsPipeImpl { public: explicit WindowsNamedPipeImpl(TAutoHandle &pipehandle, std::string pipename) : Pipe_(pipehandle.release()), pipename_(pipename) {} virtual ~WindowsNamedPipeImpl() {} virtual uint32_t read(uint8_t* buf, uint32_t len) { return pseudo_sync_read(pipename_, Pipe_.h, read_event_.h, buf, len); } virtual void write(const uint8_t* buf, uint32_t len) { pseudo_sync_write(pipename_, Pipe_.h, write_event_.h, buf, len); } virtual HANDLE getPipeHandle() { return Pipe_.h; } virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); } private: TManualResetEvent read_event_; TManualResetEvent write_event_; TAutoHandle Pipe_; std::string pipename_; }; void pseudo_sync_write(const std::string & pipename_, HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len) { OVERLAPPED tempOverlap; memset(&tempOverlap, 0, sizeof(tempOverlap)); tempOverlap.hEvent = event; uint32_t written = 0; while (written < len) { BOOL result = ::WriteFile(pipe, buf + written, len - written, nullptr, &tempOverlap); if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) { DWORD err = ::GetLastError(); throw TTransportException(TTransportException::UNKNOWN, "PipeTransport: WriteFile '" + pipename_ + "' failed, err: [" + std::to_string(err) + "] " + TOutput::strerror_s(err)); } DWORD bytes = 0; result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE); if (!result) { DWORD err = ::GetLastError(); throw TTransportException(TTransportException::UNKNOWN, "PipeTransport: GetOverlappedResult '" + pipename_ + "' failed, err: [" + std::to_string(err) + "] " + TOutput::strerror_s(err)); } written += bytes; } } uint32_t pseudo_sync_read(const std::string & pipename_, HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len) { OVERLAPPED tempOverlap; memset(&tempOverlap, 0, sizeof(tempOverlap)); tempOverlap.hEvent = event; BOOL result = ::ReadFile(pipe, buf, len, nullptr, &tempOverlap); int err = ::GetLastError(); if (result == FALSE && err != ERROR_IO_PENDING) { if (err == ERROR_BROKEN_PIPE) throw TTransportException(TTransportException::END_OF_FILE, "PipeTransport: ReadFile '" + pipename_ + "' failed, err: broken pipe"); throw TTransportException(TTransportException::UNKNOWN, "PipeTransport: ReadFile '" + pipename_ + "' failed, err: [" + std::to_string(err) + "] " + TOutput::strerror_s(err)); } DWORD bytes = 0; result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE); if (!result) { err = ::GetLastError(); if (err == ERROR_BROKEN_PIPE) throw TTransportException(TTransportException::END_OF_FILE, "PipeTransport: GetOverlappedResult '" + pipename_ + "' failed, err: broken pipe"); throw TTransportException(TTransportException::UNKNOWN, "PipeTransport: GetOverlappedResult '" + pipename_ + "' failed, err: [" + std::to_string(err) + "] " + TOutput::strerror_s(err)); } return bytes; } //---- Constructors ---- PipeTransport::PipeTransport(const char* pipename, std::shared_ptr<TConfiguration> config) : TimeoutSeconds_(3), isAnonymous_(false), TVirtualTransport(config) { setPipename(pipename); } PipeTransport::PipeTransport(const std::string& pipename, std::shared_ptr<TConfiguration> config) : TimeoutSeconds_(3), isAnonymous_(false), TVirtualTransport(config) { setPipename(pipename); } PipeTransport::PipeTransport(TAutoHandle &Pipe, const std::string & pipename) : impl_(new WindowsNamedPipeImpl(Pipe, pipename)), pipename_(pipename), TimeoutSeconds_(3), isAnonymous_(false), TVirtualTransport(nullptr) { } PipeTransport::PipeTransport(std::shared_ptr<TConfiguration> config) : TimeoutSeconds_(3), isAnonymous_(false), TVirtualTransport(config) { } PipeTransport::~PipeTransport() { } //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- bool PipeTransport::isOpen() const { return impl_.get() != nullptr; } bool PipeTransport::peek() { return isOpen(); } void PipeTransport::open() { if (isOpen()) return; TAutoHandle hPipe; do { DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes hPipe.reset(CreateFileA(pipename_.c_str(), GENERIC_READ | GENERIC_WRITE, 0, // no sharing nullptr, // default security attributes OPEN_EXISTING, // opens existing pipe flags, nullptr)); // no template file if (hPipe.h != INVALID_HANDLE_VALUE) break; // success! if (::GetLastError() != ERROR_PIPE_BUSY) { int err = ::GetLastError(); throw TTransportException(TTransportException::NOT_OPEN, "PipeTransport::open CreateFile '" + pipename_ + "' failed, err: " + TOutput::strerror_s(err)); } } while (::WaitNamedPipeA(pipename_.c_str(), TimeoutSeconds_ * 1000)); if (hPipe.h == INVALID_HANDLE_VALUE) { int err = ::GetLastError(); throw TTransportException(TTransportException::NOT_OPEN, "PipeTransport::open CreateFile '" + pipename_ + "' failed, err: " + TOutput::strerror_s(err)); } impl_.reset(new WindowsNamedPipeImpl(hPipe, pipename_)); } void PipeTransport::close() { impl_.reset(); } uint32_t PipeTransport::read(uint8_t* buf, uint32_t len) { checkReadBytesAvailable(len); if (!isOpen()) throw TTransportException(TTransportException::NOT_OPEN, "PipeTransport::read '" + pipename_ + "' is called read on non-open pipe."); return impl_->read(buf, len); } void PipeTransport::write(const uint8_t* buf, uint32_t len) { if (!isOpen()) throw TTransportException(TTransportException::NOT_OPEN, "PipeTransport::write '" + pipename_ + "' is called write on non-open pipe."); impl_->write(buf, len); } //--------------------------------------------------------- // Accessors //--------------------------------------------------------- std::string PipeTransport::getPipename() { return pipename_; } void PipeTransport::setPipename(const std::string& pipename) { if (pipename.find("\\\\") == std::string::npos) pipename_ = "\\\\.\\pipe\\" + pipename; else pipename_ = pipename; } HANDLE PipeTransport::getPipeHandle() { if (impl_) return impl_->getPipeHandle(); return INVALID_HANDLE_VALUE; } void PipeTransport::setPipeHandle(HANDLE pipehandle) { if (isAnonymous_) impl_->setPipeHandle(pipehandle); else { TAutoHandle pipe(pipehandle); impl_.reset(new WindowsNamedPipeImpl(pipe, pipename_)); } } HANDLE PipeTransport::getWrtPipeHandle() { if (impl_) return impl_->getWrtPipeHandle(); return INVALID_HANDLE_VALUE; } void PipeTransport::setWrtPipeHandle(HANDLE pipehandle) { if (impl_) impl_->setWrtPipeHandle(pipehandle); } HANDLE PipeTransport::getNativeWaitHandle() { if (impl_) return impl_->getNativeWaitHandle(); return INVALID_HANDLE_VALUE; } long PipeTransport::getConnTimeout() { return TimeoutSeconds_; } void PipeTransport::setConnTimeout(long seconds) { TimeoutSeconds_ = seconds; }