common/recipes-core/rackmon2/rackmon/UnixSock.cpp (261 lines of code) (raw):

// Copyright 2021-present Facebook. All Rights Reserved. #include "UnixSock.h" #include <Log.h> #include <poll.h> #include <unistd.h> #include <csignal> #include <thread> namespace rackmonsvc { std::list<UnixService*> UnixService::activeServiceList{}; std::mutex UnixService::activeServiceListLock{}; std::tuple<struct sockaddr_un, size_t> UnixSock::getServiceAddr( const std::string& sockPath) { struct sockaddr_un ret {}; ret.sun_family = AF_UNIX; std::strncpy(ret.sun_path, sockPath.c_str(), sizeof(ret.sun_path) - 1); return std::make_tuple(ret, sockPath.size() + sizeof(ret.sun_family)); } int UnixSock::getServiceSock() { int ret = socket(AF_UNIX, SOCK_STREAM, 0); if (ret < 0) { throw std::system_error( std::error_code(errno, std::generic_category()), "socket_creation"); } return ret; } int UnixSock::createService(const std::string& sockPath) { int sock = getServiceSock(); if (access(sockPath.c_str(), F_OK) != -1) { logWarn << "WARNING: " << sockPath << " is being recreated" << std::endl; unlink(sockPath.c_str()); } auto [local, sockLen] = getServiceAddr(sockPath); if (bind(sock, (struct sockaddr*)&local, sockLen) != 0) { close(sock); throw std::system_error( std::error_code(errno, std::generic_category()), "socket_bind"); } if (listen(sock, 20) != 0) { close(sock); throw std::system_error( std::error_code(errno, std::generic_category()), "socket_listen"); } return sock; } int UnixSock::createClient(const std::string& sockPath) { int sock = getServiceSock(); auto [rackmondAddr, addrLen] = getServiceAddr(sockPath); if (connect(sock, (struct sockaddr*)&rackmondAddr, addrLen)) { close(sock); throw std::system_error( std::error_code(errno, std::generic_category()), "socket_connect"); } return sock; } UnixSock::~UnixSock() { if (sock_ != -1) close(sock_); } void UnixSock::sendChunk(const char* buf, uint16_t bufLen) { const int maxRetries = 3; int retries = 0; while (::send(sock_, &bufLen, sizeof(bufLen), 0) < 0) { retries++; if (retries == maxRetries) { throw std::system_error( std::error_code(errno, std::generic_category()), "send header"); } } if (bufLen == 0) return; uint16_t sentSize = 0; retries = 0; while (sentSize < bufLen) { int chunkSize = ::send(sock_, buf, bufLen - sentSize, 0); if (chunkSize < 0) { if (retries == maxRetries) { throw std::system_error( std::error_code(errno, std::generic_category()), "send body"); } continue; } retries = 0; sentSize += (uint16_t)chunkSize; buf += chunkSize; } } void UnixSock::send(const char* buf, size_t len) { const size_t kMaxChunkSize = 0xffff; // off == len is a special condition where we send a dummy // buf. This special condition of buf_len = 0 is handled // in sendChunk. Hence the condition of off <= len for (size_t off = 0; off <= len; off += kMaxChunkSize) { size_t rem = len - off; size_t csize = rem > kMaxChunkSize ? kMaxChunkSize : rem; sendChunk(buf + off, csize); } } bool UnixSock::recvChunk(std::vector<char>& resp) { uint16_t recvLen; const int maxRetries = 3; int retries = 0; while (::recv(sock_, &recvLen, sizeof(recvLen), 0) < 0) { retries++; if (retries == maxRetries) { throw std::system_error( std::error_code(errno, std::generic_category()), "recv header"); } } // Received dummy, that was our last chunk! if (recvLen == 0) return false; size_t off = resp.size(); resp.resize(off + recvLen); size_t received = 0; char* recvBuf = resp.data() + off; retries = 0; while (received < recvLen) { int chunkSize = ::recv(sock_, recvBuf, recvLen - received, 0); if (chunkSize < 0) { retries++; if (retries == maxRetries) { throw std::system_error( std::error_code(errno, std::generic_category()), "recv body"); } continue; } retries = 0; received += (size_t)chunkSize; recvBuf += (size_t)chunkSize; } // If we received the max size, we need to receive another // chunk. Return true, so recv() does another iteration. return recvLen == 0xffff; } void UnixSock::recv(std::vector<char>& resp) { resp.clear(); // recvchunk returns true if there are more chunks to receive. // iterate over it. recvchunk will resize resp as needed. while (recvChunk(resp) == true) ; } void UnixService::triggerExit(int /* unused */) { std::unique_lock lk(activeServiceListLock); for (auto& ent : activeServiceList) { ent->requestExit(); } } void UnixService::registerStaticExitHandler() { struct sigaction ign_action, exit_action; ign_action.sa_flags = 0; sigemptyset(&ign_action.sa_mask); ign_action.sa_handler = SIG_IGN; if (sigaction(SIGPIPE, &ign_action, NULL) != 0) { throw std::system_error( std::error_code(errno, std::generic_category()), "SIGPIPE handler"); } exit_action.sa_flags = 0; sigemptyset(&exit_action.sa_mask); exit_action.sa_handler = UnixService::triggerExit; if (sigaction(SIGTERM, &exit_action, NULL) != 0) { throw std::system_error( std::error_code(errno, std::generic_category()), "SIGTERM handler"); } if (sigaction(SIGINT, &exit_action, NULL) != 0) { throw std::system_error( std::error_code(errno, std::generic_category()), "SIGINT handler"); } } void UnixService::unregisterStaticExitHandler() { signal(SIGTERM, SIG_DFL); signal(SIGINT, SIG_DFL); } void UnixService::registerExitHandler() { if (pipe(backChannelFDs_) != 0) { std::system_error( std::error_code(errno, std::generic_category()), "Backchannel Pipe creation"); } std::unique_lock lk(activeServiceListLock); activeServiceList.push_back(this); if (activeServiceList.size() == 1) { registerStaticExitHandler(); } } void UnixService::unregisterExitHandler() { std::unique_lock lk(activeServiceListLock); for (auto it = activeServiceList.begin(); it != activeServiceList.end(); ++it) { if (*it == this) { activeServiceList.erase(it); break; } } if (activeServiceList.size() == 0) { unregisterStaticExitHandler(); } if (backChannelRequestor_ != -1) { close(backChannelRequestor_); backChannelRequestor_ = -1; } if (backChannelHandler_ != -1) { close(backChannelHandler_); backChannelHandler_ = -1; } } void UnixService::initialize(int /* argc */, char** /* argv */) { registerExitHandler(); sock_ = std::make_unique<UnixServiceSock>(sockPath_); } void UnixService::deinitialize() { unregisterExitHandler(); unlink(sockPath_.c_str()); } void UnixService::requestExit() { char c = 'c'; logInfo << "Got request exit" << std::endl; if (write(backChannelRequestor_, &c, 1) != 1) { logError << "Could not request rackmon svc to exit!" << std::endl; } } void UnixService::handleConnection(std::unique_ptr<UnixSock> sock) { std::vector<char> buf; try { sock->recv(buf); } catch (...) { logError << "Failed to receive message" << std::endl; return; } handleRequest(buf, *sock); } void UnixService::doLoop() { struct sockaddr_un client; struct pollfd pfd[2] = { {sock_->getSock(), POLLIN, 0}, {backChannelHandler_, POLLIN, 0}, }; while (1) { int ret; socklen_t clisocklen = sizeof(struct sockaddr_un); ret = poll(pfd, 2, -1); if (ret <= 0) { // This should be the common case. The entire thing // with the pipe is to handle the race condition when // we get a signal when we are actively handling a // previous request. logInfo << "Handling termination signal" << std::endl; break; } if (pfd[1].revents & POLLIN) { char c; if (read(pfd[1].fd, &c, 1) != 1) { logError << "Got something but no data!" << std::endl; } else if (c == 'c') { logInfo << "Handling termination request" << std::endl; break; } else { logError << "Got unknown command: " << c << std::endl; } } if (pfd[0].revents & POLLIN) { int clifd = accept(sock_->getSock(), (struct sockaddr*)&client, &clisocklen); if (clifd < 0) { logError << "Failed to accept new connection" << std::endl; continue; } auto clisock = std::make_unique<UnixSock>(clifd); auto tid = std::thread(&UnixService::handleConnection, this, std::move(clisock)); tid.detach(); } } } } // namespace rackmonsvc