watchman/cppclient/WatchmanConnection.cpp (303 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include "WatchmanConnection.h" #include <cstdlib> #include <folly/ExceptionWrapper.h> #include <folly/SocketAddress.h> #include <folly/executors/InlineExecutor.h> #include <folly/experimental/bser/Bser.h> #ifdef _WIN32 #include <eden/fs/utils/SpawnedProcess.h> // @manual #else #include <folly/Subprocess.h> // @manual #endif namespace watchman { using namespace folly::bser; using namespace folly; #ifdef _WIN32 using facebook::eden::SpawnedProcess; #endif // Ordered with the most likely kind first static const std::vector<dynamic> kUnilateralLabels{"subscription", "log"}; static const dynamic kError("error"); static const dynamic kCapabilities("capabilities"); // We'll just dispatch bser decodes and callbacks inline unless they // give us an alternative environment static InlineExecutor inlineExecutor; WatchmanConnection::WatchmanConnection( EventBase* eventBase, std::optional<std::string>&& sockPath, std::optional<WatchmanConnection::Callback>&& callback, Executor* cpuExecutor) : eventBase_(eventBase), sockPath_(std::move(sockPath)), callback_(std::move(callback)), cpuExecutor_(cpuExecutor ? cpuExecutor : &inlineExecutor), versionCmd_(nullptr), bufQ_(IOBufQueue::cacheChainLength()) { CHECK_NOTNULL(eventBase); } WatchmanConnection::~WatchmanConnection() { close(); } folly::Future<std::string> WatchmanConnection::getSockPath() { // Take explicit configuration first if (sockPath_.has_value()) { return makeFuture(sockPath_.value()); } // Else use the environmental variable used by watchman to report // the active socket path auto var = std::getenv("WATCHMAN_SOCK"); if (var && *var) { return makeFuture(std::string(var)); } return via(cpuExecutor_, [] { // Else discover it from the CLI #ifdef _WIN32 SpawnedProcess::Options options; options.pipeStdout(); options.pipeStderr(); SpawnedProcess proc{ {"watchman", "--output-encoding=bser", "get-sockname"}, std::move(options)}; #else folly::Subprocess proc( {"watchman", "--output-encoding=bser", "get-sockname"}, folly::Subprocess::Options().pipeStdout().pipeStderr().usePath()); #endif auto out_pair = proc.communicate(); auto returnCode = proc.wait(); if (returnCode.exitStatus() != 0) { throw WatchmanError{folly::to<std::string>( "`watchman get-sockname` returned error code ", returnCode.exitStatus(), #ifndef _WIN32 " when called as user ", geteuid(), #endif ". Error: ", out_pair.second)}; } auto result = parseBser(out_pair.first); // Recent versions of watchman include both `unix_domain` and `sockname` // fields, however older versions - such as v4.9.0, included in Ubuntu // 20.04 - only define `sockname`. // // Prefer the newer, more specific 'unix_domain', but fall back to // 'sockname'. if (result.count("unix_domain")) { return result["unix_domain"].asString(); } return result["sockname"].asString(); }); } Future<dynamic> WatchmanConnection::connect(folly::dynamic versionArgs) { if (!versionArgs.isObject()) { throw WatchmanError("versionArgs must be object"); } versionCmd_ = folly::dynamic::array("version", versionArgs); auto res = getSockPath().thenValue( [shared_this = shared_from_this()](std::string&& path) { shared_this->eventBase_->runInEventBaseThread([=] { folly::SocketAddress addr; addr.setFromPath(path); shared_this->sock_ = folly::AsyncSocket::newSocket(shared_this->eventBase_.get()); shared_this->sock_->connect(shared_this.get(), addr); }); return shared_this->connectPromise_.getFuture(); }); return res; } void WatchmanConnection::close() { if (closing_) { return; } closing_ = true; if (sock_) { eventBase_->runImmediatelyOrRunInEventBaseThreadAndWait([this] { // This implicitly closes the connection without flushing outstanding // writes. Should be fine as Watchman is mostly just providing info, so // an incomplete partial write isn't a problem. Doing a fully flushing // close here might be the cause a deadlock accessing the event base. sock_.reset(); }); } failQueuedCommands(make_exception_wrapper<WatchmanError>( "WatchmanConnection::close() was called")); } // The convention for Watchman responses is that they represent // an error if they contain the "error" key. We want to report // those as exceptions, but it is easier to do that via a Try Try<dynamic> WatchmanConnection::watchmanResponseToTry(dynamic&& value) { auto error = value.get_ptr(kError); if (error) { return Try<dynamic>(make_exception_wrapper<WatchmanResponseError>(value)); } return Try<dynamic>(std::move(value)); } void WatchmanConnection::connectSuccess() noexcept { try { sock_->setReadCB(this); sock_->setCloseOnExec(); run(versionCmd_) .thenValue([shared_this = shared_from_this()](dynamic&& result) { // If there is no "capabilities" key then the version of // watchman is too old; treat this as an error if (!result.get_ptr(kCapabilities)) { result["error"] = "This watchman server has no support for capabilities, " "please upgrade to the current stable version of watchman"; shared_this->connectPromise_.setTry( shared_this->watchmanResponseToTry(std::move(result))); return; } shared_this->connectPromise_.setValue(std::move(result)); }) .thenError([shared_this = shared_from_this()](const folly::exception_wrapper& e) { shared_this->connectPromise_.setException(e); }); } catch (const std::exception& e) { connectPromise_.setException( folly::exception_wrapper(std::current_exception(), e)); } catch (...) { connectPromise_.setException( folly::exception_wrapper(std::current_exception())); } } void WatchmanConnection::connectErr( const folly::AsyncSocketException& ex) noexcept { connectPromise_.setException(ex); } WatchmanConnection::QueuedCommand::QueuedCommand(const dynamic& command) : cmd(command) {} Future<dynamic> WatchmanConnection::run(const dynamic& command) noexcept { auto cmd = std::make_shared<QueuedCommand>(command); if (broken_) { cmd->promise.setException(WatchmanError("The connection was broken")); return cmd->promise.getFuture(); } if (!sock_) { cmd->promise.setException(WatchmanError( "No socket (did you call connect() and check result for exceptions?)")); return cmd->promise.getFuture(); } bool shouldWrite; { std::lock_guard<std::mutex> g(mutex_); // We only need to call sendCommand if we don't have a command in // progress; the completion handler will trigger it once we receive // the response shouldWrite = commandQ_.empty(); commandQ_.push_back(cmd); } if (shouldWrite) { eventBase_->runInEventBaseThread( [shared_this = shared_from_this()] { shared_this->sendCommand(); }); } return cmd->promise.getFuture(); } // Generate a failure for all queued commands void WatchmanConnection::failQueuedCommands( const folly::exception_wrapper& ex) { std::lock_guard<std::mutex> g(mutex_); auto q = commandQ_; commandQ_.clear(); broken_ = true; for (auto& cmd : q) { if (!cmd->promise.isFulfilled()) { cmd->promise.setException(ex); } } // If the user has explicitly closed the connection no need for callback if (callback_ && !closing_) { cpuExecutor_->add([shared_this = shared_from_this(), ex] { (*(shared_this->callback_))(folly::Try<folly::dynamic>(ex)); }); } } // Sends the next eligible command to the Watchman service void WatchmanConnection::sendCommand(bool pop) { std::shared_ptr<QueuedCommand> cmd; { std::lock_guard<std::mutex> g(mutex_); if (pop) { // We finished processing this one, discard it and focus // on the next item, if any. commandQ_.pop_front(); } if (commandQ_.empty()) { return; } cmd = commandQ_.front(); } sock_->writeChain(this, toBserIOBuf(cmd->cmd, serialization_opts())); } void WatchmanConnection::popAndSendCommand() { sendCommand(/* pop = */ true); } // Called when AsyncSocket::writeChain completes void WatchmanConnection::writeSuccess() noexcept { // Don't care particularly } // Called when AsyncSocket::writeChain fails void WatchmanConnection::writeErr( size_t, const folly::AsyncSocketException& ex) noexcept { failQueuedCommands(ex); } // Called when AsyncSocket wants to give us data void WatchmanConnection::getReadBuffer(void** bufReturn, size_t* lenReturn) { std::lock_guard<std::mutex> g(mutex_); const auto ret = bufQ_.preallocate(2048, 2048); *bufReturn = ret.first; *lenReturn = ret.second; } // Called when AsyncSocket gave us data void WatchmanConnection::readDataAvailable(size_t len) noexcept { { std::lock_guard<std::mutex> g(mutex_); bufQ_.postallocate(len); } cpuExecutor_->add([shared_this = shared_from_this()] { shared_this->decodeNextResponse(); }); } std::unique_ptr<folly::IOBuf> WatchmanConnection::splitNextPdu() { std::lock_guard<std::mutex> g(mutex_); if (!bufQ_.front()) { return nullptr; } // Do we have enough data to decode the next item? size_t pdu_len = 0; try { pdu_len = decodePduLength(bufQ_.front()); } catch (const std::out_of_range&) { // Don't have enough data yet return nullptr; } if (pdu_len > bufQ_.chainLength()) { // Don't have enough data yet return nullptr; } // Remove the PDU blob from the front of the chain return bufQ_.split(pdu_len); } // Try to peel off one or more PDU's from our buffer queue. // Decode each complete PDU from BSER -> dynamic and dispatch // either the associated QueuedCommand or to the callback_ for // unilateral responses. // This is executed via the cpuExecutor. We only allow one // thread to carry out the decoding at a time so that the callbacks // are triggered in the order that they are received. It is possible // for us to receive a large PDU followed by a small one and for the // small one to finish decoding before the large one, so we must // serialize the dispatching. void WatchmanConnection::decodeNextResponse() { if (decoding_.exchange(true)) { return; } SCOPE_EXIT { decoding_.store(false); }; while (true) { auto pdu = splitNextPdu(); if (!pdu) { return; } try { auto decoded = parseBser(pdu.get()); bool is_unilateral = false; // Check for a unilateral response for (const auto& k : kUnilateralLabels) { if (decoded.get_ptr(k)) { // This is a unilateral response if (callback_.has_value()) { callback_.value()(watchmanResponseToTry(std::move(decoded))); is_unilateral = true; break; } // No callback; usage error :-/ failQueuedCommands( std::runtime_error("No unilateral callback has been installed")); return; } } if (is_unilateral) { continue; } // It's actually a command response; get the cmd so that we // can fulfil its promise std::shared_ptr<QueuedCommand> cmd; { std::lock_guard<std::mutex> g(mutex_); if (commandQ_.empty()) { failQueuedCommands( std::runtime_error("No commands have been queued")); return; } cmd = commandQ_.front(); } // Dispatch outside of the lock in case it tries to send another // command cmd->promise.setTry(watchmanResponseToTry(std::move(decoded))); // Now we're in a position to send the next queued command. // We remove it after dispatching the try above in case that // queued up more commands; we want to be the one thing that // is responsible for sending the next queued command here popAndSendCommand(); } catch (const std::exception& ex) { failQueuedCommands( folly::exception_wrapper{std::current_exception(), ex}); return; } } } // Called when AsyncSocket hits EOF void WatchmanConnection::readEOF() noexcept { failQueuedCommands( std::system_error(ENOTCONN, std::system_category(), "connection closed")); } // Called when AsyncSocket has a read error void WatchmanConnection::readErr( const folly::AsyncSocketException& ex) noexcept { failQueuedCommands(ex); } } // namespace watchman