src/server/worker.cc (477 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "worker.h"
#include <event2/util.h>
#include <unistd.h>
#include <cstdint>
#include <stdexcept>
#include <string>
#include "event2/bufferevent.h"
#include "io_util.h"
#include "logging.h"
#include "scope_exit.h"
#include "thread_util.h"
#ifdef ENABLE_OPENSSL
#include <event2/bufferevent_ssl.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#endif
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <algorithm>
#include <utility>
#include "redis_connection.h"
#include "redis_request.h"
#include "server.h"
#include "storage/scripting.h"
Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) {
if (!base_) throw std::runtime_error{"event base failed to be created"};
timer_.reset(NewEvent(base_, -1, EV_PERSIST));
timeval tm = {10, 0};
evtimer_add(timer_.get(), &tm);
if (config->socket_fd != -1) {
if (const Status s = listenFD(config->socket_fd, config->port, config->backlog); !s.IsOK()) {
error("[worker] Failed to listen to socket with fd: {}, Error: {}", config->socket_fd, s.Msg());
exit(1);
}
} else {
const uint32_t ports[3] = {config->port, config->tls_port, 0};
for (const uint32_t *port = ports; *port; ++port) {
for (const auto &bind : config->binds) {
if (const Status s = listenTCP(bind, *port, config->backlog); !s.IsOK()) {
error("[worker] Failed to listen on: {}:{}, Error: {}", bind, *port, s.Msg());
exit(1);
}
info("[worker] Listening on: {}:{}", bind, *port);
}
}
}
lua_ = lua::CreateState();
}
Worker::~Worker() {
std::vector<redis::Connection *> conns;
conns.reserve(conns_.size() + monitor_conns_.size());
for (const auto &iter : conns_) {
conns.emplace_back(iter.second);
}
for (const auto &iter : monitor_conns_) {
conns.emplace_back(iter.second);
}
for (const auto &iter : conns) {
iter->Close();
}
timer_.reset();
if (rate_limit_group_) {
bufferevent_rate_limit_group_free(rate_limit_group_);
}
if (rate_limit_group_cfg_) {
ev_token_bucket_cfg_free(rate_limit_group_cfg_);
}
event_base_free(base_);
lua::DestroyState(lua_);
}
void Worker::TimerCB(int, [[maybe_unused]] int16_t events) {
auto config = srv->GetConfig();
if (config->timeout == 0) return;
KickoutIdleClients(config->timeout);
}
void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, [[maybe_unused]] sockaddr *address,
[[maybe_unused]] int socklen) {
int local_port = util::GetLocalPort(fd); // NOLINT
debug("[worker] New connection: fd={} from port: {} thread #{}", fd, local_port, fmt::streamed(tid_));
auto s = util::SockSetTcpKeepalive(fd, 120);
if (!s.IsOK()) {
error("[worker] Failed to set tcp-keepalive on socket. Error: {}", s.Msg());
evutil_closesocket(fd);
return;
}
s = util::SockSetTcpNoDelay(fd, 1);
if (!s.IsOK()) {
error("[worker] Failed to set tcp-nodelay on socket. Error: {}", s.Msg());
evutil_closesocket(fd);
return;
}
event_base *base = evconnlistener_get_base(listener);
auto ev_thread_safe_flags =
BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE;
bufferevent *bev = nullptr;
ssl_st *ssl = nullptr;
#ifdef ENABLE_OPENSSL
if (uint32_t(local_port) == srv->GetConfig()->tls_port) {
ssl = SSL_new(srv->ssl_ctx.get());
if (!ssl) {
error("[worker] Failed to construct SSL structure for new connection: {}", fmt::streamed(SSLErrors{}));
evutil_closesocket(fd);
return;
}
bev = bufferevent_openssl_socket_new(base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, ev_thread_safe_flags);
} else {
bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
}
#else
bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
#endif
if (!bev) {
auto socket_err = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
#ifdef ENABLE_OPENSSL
error("[worker] Failed to construct socket for new connection: {}, SSL error: {}", socket_err,
fmt::streamed(SSLErrors{}));
if (ssl) SSL_free(ssl);
#else
error("[worker] Failed to construct socket for new connection: {}", socket_err);
#endif
evutil_closesocket(fd);
return;
}
#ifdef ENABLE_OPENSSL
if (uint32_t(local_port) == srv->GetConfig()->tls_port) {
bufferevent_openssl_set_allow_dirty_shutdown(bev, 1);
}
#endif
auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);
s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error({Status::NotOK, s.Msg()});
s = util::SockSend(fd, err_msg, ssl);
if (!s.IsOK()) {
warn("[worker] Failed to send error response to socket: {}", s.Msg());
}
conn->Close();
return;
}
if (auto s = util::GetPeerAddr(fd)) {
auto [ip, port] = std::move(*s);
conn->SetAddr(ip, port);
}
if (rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}
void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, [[maybe_unused]] sockaddr *address,
[[maybe_unused]] int socklen) {
debug("[worker] New connection: fd={} from unixsocket: {} thread #{}", fd, srv->GetConfig()->unixsocket,
fmt::streamed(tid_));
event_base *base = evconnlistener_get_base(listener);
auto ev_thread_safe_flags =
BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE;
bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);
auto s = AddConnection(conn);
if (!s.IsOK()) {
s = util::SockSend(fd, redis::Error(s));
if (!s.IsOK()) {
warn("[worker] Failed to send error response to socket: {}", s.Msg());
}
conn->Close();
return;
}
conn->SetAddr(srv->GetConfig()->unixsocket, 0);
if (rate_limit_group_) {
bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}
Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) {
const uint32_t port = util::GetLocalPort(fd);
if (port != expected_port) {
return {Status::NotOK, "The port of the provided socket fd doesn't match the configured port"};
}
const int dup_fd = dup(fd);
if (dup_fd == -1) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
evconnlistener *lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
listen_events_.emplace_back(lev);
info("[worker] Listening on dup'ed fd: {}", dup_fd);
return Status::OK();
}
Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
bool ipv6_used = strchr(host.data(), ':');
addrinfo hints = {};
hints.ai_family = ipv6_used ? AF_INET6 : AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
addrinfo *srv_info = nullptr;
if (int rv = getaddrinfo(host.data(), std::to_string(port).c_str(), &hints, &srv_info); rv != 0) {
return {Status::NotOK, gai_strerror(rv)};
}
auto exit = MakeScopeExit([srv_info] { freeaddrinfo(srv_info); });
for (auto p = srv_info; p != nullptr; p = p->ai_next) {
int fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (fd == -1) continue;
int sock_opt = 1;
if (ipv6_used && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &sock_opt, sizeof(sock_opt)) == -1) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sock_opt, sizeof(sock_opt)) < 0) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
// to support multi-thread binding on macOS
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &sock_opt, sizeof(sock_opt)) < 0) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
if (bind(fd, p->ai_addr, p->ai_addrlen)) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
evutil_make_socket_nonblocking(fd);
auto lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}
return Status::OK();
}
Status Worker::ListenUnixSocket(const std::string &path, int perm, int backlog) {
unlink(path.c_str());
sockaddr_un sa{};
if (path.size() > sizeof(sa.sun_path) - 1) {
return {Status::NotOK, "unix socket path too long"};
}
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path, path.c_str(), sizeof(sa.sun_path) - 1);
int fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if (fd == -1) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
if (bind(fd, (sockaddr *)&sa, sizeof(sa)) < 0) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
evutil_make_socket_nonblocking(fd);
auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
if (perm != 0) {
chmod(sa.sun_path, (mode_t)perm);
}
return Status::OK();
}
void Worker::Run(std::thread::id tid) {
tid_ = tid;
if (event_base_dispatch(base_) != 0) {
error("[worker] Failed to run server, err: {}", strerror(errno));
}
is_terminated_ = true;
}
void Worker::Stop(uint32_t wait_seconds) {
for (const auto &lev : listen_events_) {
// It's unnecessary to close the listener fd since we have set the LEV_OPT_CLOSE_ON_FREE flag
evconnlistener_free(lev);
}
// wait_seconds == 0 means stop immediately, or it will wait N seconds
// for the worker to process the remaining requests before stopping.
if (wait_seconds > 0) {
timeval tv = {wait_seconds, 0};
event_base_loopexit(base_, &tv);
} else {
event_base_loopbreak(base_);
}
}
Status Worker::AddConnection(redis::Connection *c) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(c->GetFD());
if (iter != conns_.end()) {
return {Status::NotOK, "connection was exists"};
}
int max_clients = srv->GetConfig()->maxclients;
if (srv->IncrClientNum() >= max_clients) {
srv->DecrClientNum();
return {Status::NotOK, "max number of clients reached"};
}
conns_.emplace(c->GetFD(), c);
uint64_t id = srv->GetClientID();
c->SetID(id);
return Status::OK();
}
redis::Connection *Worker::removeConnection(int fd) {
redis::Connection *conn = nullptr;
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
conn = iter->second;
conns_.erase(iter);
srv->DecrClientNum();
}
iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end()) {
conn = iter->second;
monitor_conns_.erase(iter);
srv->DecrClientNum();
srv->DecrMonitorClientNum();
}
return conn;
}
// MigrateConnection moves the connection to another worker
// when reducing the number of workers.
//
// To make it simple, we would close the connection if it's
// blocked on a key or stream.
void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
if (!target || !conn) return;
auto bev = conn->GetBufferEvent();
// disable read/write event to prevent the connection from being processed during migration
bufferevent_disable(bev, EV_READ | EV_WRITE);
// We cannot migrate the connection if it has a running command
// since it will cause data race since the old worker may still process the command.
if (!conn->CanMigrate()) {
// Need to enable read/write event again since we disabled them before
bufferevent_enable(bev, EV_READ | EV_WRITE);
return;
}
// remove the connection from current worker
DetachConnection(conn);
if (!target->AddConnection(conn).IsOK()) {
conn->Close();
return;
}
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ | EV_WRITE);
conn->SetOwner(target);
}
void Worker::DetachConnection(redis::Connection *conn) {
if (!conn) return;
removeConnection(conn->GetFD());
if (rate_limit_group_) {
bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent());
}
auto bev = conn->GetBufferEvent();
bufferevent_disable(bev, EV_READ | EV_WRITE);
bufferevent_setcb(bev, nullptr, nullptr, nullptr, nullptr);
}
void Worker::FreeConnection(redis::Connection *conn) {
if (!conn) return;
removeConnection(conn->GetFD());
srv->ResetWatchedKeys(conn);
if (rate_limit_group_) {
bufferevent_remove_from_rate_limit_group(conn->GetBufferEvent());
}
delete conn;
}
void Worker::FreeConnectionByID(int fd, uint64_t id) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end() && iter->second->GetID() == id) {
if (rate_limit_group_ != nullptr) {
bufferevent_remove_from_rate_limit_group(iter->second->GetBufferEvent());
}
delete iter->second;
conns_.erase(iter);
srv->DecrClientNum();
}
iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end() && iter->second->GetID() == id) {
delete iter->second;
monitor_conns_.erase(iter);
srv->DecrClientNum();
srv->DecrMonitorClientNum();
}
}
Status Worker::EnableWriteEvent(int fd) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
auto bev = iter->second->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
return Status::OK();
}
return {Status::NotOK, "connection doesn't exist"};
}
Status Worker::Reply(int fd, const std::string &reply) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
iter->second->SetLastInteraction();
redis::Reply(iter->second->Output(), reply);
return Status::OK();
}
return {Status::NotOK, "connection doesn't exist"};
}
void Worker::BecomeMonitorConn(redis::Connection *conn) {
{
std::lock_guard<std::mutex> guard(conns_mu_);
conns_.erase(conn->GetFD());
monitor_conns_[conn->GetFD()] = conn;
}
srv->IncrMonitorClientNum();
conn->EnableFlag(redis::Connection::kMonitor);
}
void Worker::QuitMonitorConn(redis::Connection *conn) {
{
std::lock_guard<std::mutex> guard(conns_mu_);
monitor_conns_.erase(conn->GetFD());
conns_[conn->GetFD()] = conn;
}
srv->DecrMonitorClientNum();
conn->DisableFlag(redis::Connection::kMonitor);
}
void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) {
std::unique_lock<std::mutex> lock(conns_mu_);
for (const auto &iter : monitor_conns_) {
if (conn == iter.second) continue; // skip the monitor command
if (conn->GetNamespace() == iter.second->GetNamespace() || iter.second->GetNamespace() == kDefaultNamespace) {
iter.second->Reply(response);
}
}
}
std::string Worker::GetClientsStr() {
std::unique_lock<std::mutex> lock(conns_mu_);
std::string output;
for (const auto &iter : conns_) {
redis::Connection *conn = iter.second;
output.append(conn->ToString());
}
return output;
}
void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme,
int64_t *killed) {
std::lock_guard<std::mutex> guard(conns_mu_);
for (const auto &iter : conns_) {
redis::Connection *conn = iter.second;
if (skipme && self == conn) continue;
// no need to kill the client again if the kCloseAfterReply flag is set
if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) {
continue;
}
if ((type & conn->GetClientType()) ||
(!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) ||
(id != 0 && conn->GetID() == id)) {
conn->EnableFlag(redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection
auto bev = conn->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
}
(*killed)++;
}
}
}
void Worker::LuaReset() {
auto lua = lua_.exchange(lua::CreateState());
lua::DestroyState(lua);
}
int64_t Worker::GetLuaMemorySize() { return (int64_t)lua_gc(lua_, LUA_GCCOUNT, 0) * 1024; }
void Worker::KickoutIdleClients(int timeout) {
std::vector<std::pair<int, uint64_t>> to_be_killed_conns;
{
std::lock_guard<std::mutex> guard(conns_mu_);
if (conns_.empty()) {
return;
}
int iterations = std::min(static_cast<int>(conns_.size()), 50);
auto iter = conns_.upper_bound(last_iter_conn_fd_);
while (iterations--) {
if (iter == conns_.end()) iter = conns_.begin();
if (static_cast<int>(iter->second->GetIdleTime()) >= timeout) {
to_be_killed_conns.emplace_back(iter->first, iter->second->GetID());
}
iter++;
}
iter--;
last_iter_conn_fd_ = iter->first;
}
for (const auto &conn : to_be_killed_conns) {
FreeConnectionByID(conn.first, conn.second);
}
}
void WorkerThread::Start() {
auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });
if (s) {
t_ = std::move(*s);
} else {
error("[worker] Failed to start worker thread, err: {}", s.Msg());
return;
}
info("[worker] Thread #{} started", fmt::streamed(t_.get_id()));
}
void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }
void WorkerThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
warn("[worker] {}", s.Msg());
}
}