src/cluster/replication.cc (865 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "replication.h"
#include <arpa/inet.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <algorithm>
#include <atomic>
#include <csignal>
#include <future>
#include <memory>
#include <string>
#include <string_view>
#include <thread>
#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "logging.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "scope_exit.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "status.h"
#include "storage/batch_debugger.h"
#include "thread_util.h"
#include "time_util.h"
#include "unique_fd.h"
#ifdef ENABLE_OPENSSL
#include <event2/bufferevent_ssl.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#endif
Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
sigset_t mask, omask;
sigemptyset(&mask);
sigemptyset(&omask);
sigaddset(&mask, SIGCHLD);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = util::SockSend(conn_->GetFD(), redis::RESP_OK, conn_->GetBufferEvent());
if (!s.IsOK()) {
error("failed to send OK response to the replica: {}", s.Msg());
return;
}
this->loop();
});
if (s) {
t_ = std::move(*s);
} else {
conn_ = nullptr; // prevent connection was freed when failed to start the thread
}
return std::move(s);
}
void FeedSlaveThread::Stop() {
stop_ = true;
warn("Slave thread was terminated, would stop feeding the slave: {}", conn_->GetAddr());
}
void FeedSlaveThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
warn("Slave thread operation failed: {}", s.Msg());
}
}
void FeedSlaveThread::checkLivenessIfNeed() {
if (++interval_ % 1000) return;
const auto ping_command = redis::BulkString("ping");
auto s = util::SockSend(conn_->GetFD(), ping_command, conn_->GetBufferEvent());
if (!s.IsOK()) {
error("Ping slave [{}] err: {}, would stop the thread", conn_->GetAddr(), s.Msg());
Stop();
}
}
void FeedSlaveThread::loop() {
// is_first_repl_batch was used to fix that replication may be stuck in a dead loop
// when some seqs might be lost in the middle of the WAL log, so forced to replicate
// first batch here to work around this issue instead of waiting for enough batch size.
bool is_first_repl_batch = true;
uint32_t yield_microseconds = 2 * 1000;
std::string batches_bulk;
size_t updates_in_batches = 0;
while (!IsStopped()) {
auto curr_seq = next_repl_seq_.load();
if (!iter_ || !iter_->Valid()) {
if (iter_) info("WAL was rotated, would reopen again");
if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
iter_ = nullptr;
usleep(yield_microseconds);
checkLivenessIfNeed();
continue;
}
}
// iter_ would be always valid here
auto batch = iter_->GetBatch();
if (batch.sequence != curr_seq) {
error(
"Fatal error encountered, WAL iterator is discrete, some seq might be lost, sequence {} expected, but got {}",
curr_seq, batch.sequence);
Stop();
return;
}
updates_in_batches += batch.writeBatchPtr->Count();
batches_bulk += redis::BulkString(batch.writeBatchPtr->Data());
// 1. We must send the first replication batch, as said above.
// 2. To avoid frequently calling 'write' system call to send replication stream,
// we pack multiple batches into one big bulk if possible, and only send once.
// But we should send the bulk of batches if its size exceed kMaxDelayBytes,
// 16Kb by default. Moreover, we also send if updates count in all bathes is
// more that kMaxDelayUpdates, to void too many delayed updates.
// 3. To avoid master don't send replication stream to slave since of packing
// batches strategy, we still send batches if current batch sequence is less
// kMaxDelayUpdates than latest sequence.
if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes || updates_in_batches >= kMaxDelayUpdates ||
srv_->storage->LatestSeqNumber() - batch.sequence <= kMaxDelayUpdates) {
// Send entire bulk which contain multiple batches
auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
if (!s.IsOK()) {
error("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk));
Stop();
return;
}
is_first_repl_batch = false;
batches_bulk.clear();
if (batches_bulk.capacity() > kMaxDelayBytes * 2) batches_bulk.shrink_to_fit();
updates_in_batches = 0;
}
curr_seq = batch.sequence + batch.writeBatchPtr->Count();
next_repl_seq_.store(curr_seq);
while (!IsStopped() && !srv_->storage->WALHasNewData(curr_seq)) {
usleep(yield_microseconds);
checkLivenessIfNeed();
}
iter_->Next();
}
}
void SendString(bufferevent *bev, const std::string &data) {
auto output = bufferevent_get_output(bev);
evbuffer_add(output, data.c_str(), data.length());
}
void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int16_t events) {
if (events & BEV_EVENT_CONNECTED) {
// call write_cb when connected
bufferevent_data_cb write_cb = nullptr;
bufferevent_getcb(bev, nullptr, &write_cb, nullptr, nullptr);
if (write_cb) write_cb(bev, this);
return;
}
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
error("[replication] connection error/eof, reconnect the master");
// Wait a bit and reconnect
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::seconds(1));
Stop();
Start();
}
}
void ReplicationThread::CallbacksStateMachine::SetReadCB(bufferevent *bev, bufferevent_data_cb cb) {
bufferevent_enable(bev, EV_READ);
bufferevent_setcb(bev, cb, nullptr, EventCallbackFunc<&CallbacksStateMachine::ConnEventCB>, this);
}
void ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev, bufferevent_data_cb cb) {
bufferevent_enable(bev, EV_WRITE);
bufferevent_setcb(bev, nullptr, cb, EventCallbackFunc<&CallbacksStateMachine::ConnEventCB>, this);
}
void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
LOOP_LABEL:
assert(handler_idx_ <= handlers_.size());
debug("[replication] Execute handler[{}]", getHandlerName(handler_idx_));
auto st = getHandlerFunc(handler_idx_)(repl_, bev);
repl_->last_io_time_secs_.store(util::GetTimeStamp(), std::memory_order_relaxed);
switch (st) {
case CBState::NEXT:
++handler_idx_;
[[fallthrough]];
case CBState::PREV:
if (st == CBState::PREV) --handler_idx_;
if (getHandlerEventType(handler_idx_) == WRITE) {
SetWriteCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
} else {
SetReadCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
}
// invoke the read handler (of next step) directly, as the bev might
// have the data already.
goto LOOP_LABEL; // NOLINT
case CBState::AGAIN:
break;
case CBState::QUIT: // state that can not be retry, or all steps are executed.
bufferevent_free(bev);
bev_ = nullptr;
repl_->repl_state_.store(kReplError, std::memory_order_relaxed);
break;
case CBState::RESTART: // state that can be retried some time later
Stop();
if (repl_->stop_flag_) {
info("[replication] Wouldn't restart while the replication thread was stopped");
break;
}
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
info("[replication] Retry in 10 seconds");
std::this_thread::sleep_for(std::chrono::seconds(10));
Start();
}
}
void ReplicationThread::CallbacksStateMachine::Start() {
struct bufferevent *bev = nullptr;
if (handlers_.empty()) {
return;
}
// Note: It may cause data races to use 'masterauth' directly.
// It is acceptable because password change is a low frequency operation.
if (!repl_->srv_->GetConfig()->masterauth.empty()) {
handlers_.emplace_front(CallbacksStateMachine::READ, "auth read", &ReplicationThread::authReadCB);
handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write", &ReplicationThread::authWriteCB);
}
uint64_t last_connect_timestamp = 0;
while (!repl_->stop_flag_ && bev == nullptr) {
if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
// prevent frequent re-connect when the master is down with the connection refused error
sleep(1);
}
last_connect_timestamp = util::GetTimeStampMS();
auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
error("[replication] Failed to connect the master, err: {}", cfd.Msg());
continue;
}
#ifdef ENABLE_OPENSSL
SSL *ssl = nullptr;
if (repl_->srv_->GetConfig()->tls_replication) {
ssl = SSL_new(repl_->srv_->ssl_ctx.get());
if (!ssl) {
error("Failed to construct SSL structure for new connection: {}", fmt::streamed(SSLErrors{}));
evutil_closesocket(*cfd);
return;
}
bev = bufferevent_openssl_socket_new(repl_->base_, *cfd, ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_CLOSE_ON_FREE);
} else {
bev = bufferevent_socket_new(repl_->base_, *cfd, BEV_OPT_CLOSE_ON_FREE);
}
#else
bev = bufferevent_socket_new(repl_->base_, *cfd, BEV_OPT_CLOSE_ON_FREE);
#endif
if (bev == nullptr) {
#ifdef ENABLE_OPENSSL
if (ssl) SSL_free(ssl);
#endif
close(*cfd);
error("[replication] Failed to create the event socket");
continue;
}
#ifdef ENABLE_OPENSSL
if (repl_->srv_->GetConfig()->tls_replication) {
bufferevent_openssl_set_allow_dirty_shutdown(bev, 1);
}
#endif
}
if (bev == nullptr) { // failed to connect the master and received the stop signal
return;
}
handler_idx_ = 0;
repl_->incr_state_ = Incr_batch_size;
if (getHandlerEventType(0) == WRITE) {
SetWriteCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
} else {
SetReadCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
}
bev_ = bev;
}
void ReplicationThread::CallbacksStateMachine::Stop() {
if (bev_) {
bufferevent_free(bev_);
bev_ = nullptr;
}
}
ReplicationThread::ReplicationThread(std::string host, uint32_t port, Server *srv)
: host_(std::move(host)),
port_(port),
srv_(srv),
storage_(srv->storage),
repl_state_(kReplConnecting),
psync_steps_(
this,
CallbacksStateMachine::CallbackList{
CallbackType{CallbacksStateMachine::WRITE, "dbname write", &ReplicationThread::checkDBNameWriteCB},
CallbackType{CallbacksStateMachine::READ, "dbname read", &ReplicationThread::checkDBNameReadCB},
CallbackType{CallbacksStateMachine::WRITE, "replconf write", &ReplicationThread::replConfWriteCB},
CallbackType{CallbacksStateMachine::READ, "replconf read", &ReplicationThread::replConfReadCB},
CallbackType{CallbacksStateMachine::WRITE, "psync write", &ReplicationThread::tryPSyncWriteCB},
CallbackType{CallbacksStateMachine::READ, "psync read", &ReplicationThread::tryPSyncReadCB},
CallbackType{CallbacksStateMachine::READ, "batch loop", &ReplicationThread::incrementBatchLoopCB}}),
fullsync_steps_(
this, CallbacksStateMachine::CallbackList{
CallbackType{CallbacksStateMachine::WRITE, "fullsync write", &ReplicationThread::fullSyncWriteCB},
CallbackType{CallbacksStateMachine::READ, "fullsync read", &ReplicationThread::fullSyncReadCB}}) {}
Status ReplicationThread::Start(std::function<bool()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb) {
pre_fullsync_cb_ = std::move(pre_fullsync_cb);
post_fullsync_cb_ = std::move(post_fullsync_cb);
// Clean synced checkpoint from old master because replica starts to follow new master
auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, rocksdb::Options());
if (!s.ok()) {
warn("Can't clean synced checkpoint from master, error: {}", s.ToString());
} else {
warn("Clean old synced checkpoint successfully");
}
// cleanup the old backups, so we can start replication in a clean state
storage_->PurgeOldBackups(0, 0);
t_ = GET_OR_RET(util::CreateThread("master-repl", [this] {
this->run();
assert(stop_flag_);
}));
return Status::OK();
}
void ReplicationThread::Stop() {
if (stop_flag_) return;
stop_flag_ = true; // Stopping procedure is asynchronous,
// handled by timer
if (auto s = util::ThreadJoin(t_); !s) {
warn("Replication thread operation failed: {}", s.Msg());
}
info("[replication] Stopped");
}
/*
* Run connect to master, and start the following steps
* asynchronously
* - CheckDBName
* - TryPsync
* - - if ok, IncrementBatchLoop
* - - not, FullSync and restart TryPsync when done
*/
void ReplicationThread::run() {
base_ = event_base_new();
if (base_ == nullptr) {
error("[replication] Failed to create new ev base");
return;
}
psync_steps_.Start();
auto timer = UniqueEvent(NewEvent(base_, -1, EV_PERSIST));
timeval tmo{0, 100000}; // 100 ms
evtimer_add(timer.get(), &tmo);
event_base_dispatch(base_);
timer.reset();
event_base_free(base_);
}
ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
SendString(bev, redis::ArrayOfBulkStrings({"AUTH", srv_->GetConfig()->masterauth}));
info("[replication] Auth request was sent, waiting for response");
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
}
inline bool ResponseLineIsOK(std::string_view line) { return line == RESP_PREFIX_SIMPLE_STRING "OK"; }
ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) { // NOLINT
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (!ResponseLineIsOK(line.View())) {
// Auth failed
error("[replication] Auth failed: {}", line.get());
return CBState::RESTART;
}
info("[replication] Auth response was received, continue...");
return CBState::NEXT;
}
ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent *bev) {
SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
info("[replication] Check db name request was sent, waiting for response");
return CBState::NEXT;
}
ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(bufferevent *bev) {
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
if (isRestoringError(line.View())) {
warn("The master was restoring the db, retry later");
} else {
error("Failed to get the db name, {}", line.get());
}
return CBState::RESTART;
}
std::string db_name = storage_->GetName();
if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(), line.length)) {
// DB name match, we should continue to next step: TryPsync
info("[replication] DB name is valid, continue...");
return CBState::NEXT;
}
error("[replication] Mismatched the db name, local: {}, remote: {}", db_name, line.get());
return CBState::RESTART;
}
ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent *bev) {
auto config = srv_->GetConfig();
auto port = config->replica_announce_port > 0 ? config->replica_announce_port : config->port;
std::vector<std::string> data_to_send{"replconf", "listening-port", std::to_string(port)};
if (!next_try_without_announce_ip_address_ && !config->replica_announce_ip.empty()) {
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
info("[replication] replconf request was sent, waiting for response");
return CBState::NEXT;
}
ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) {
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
// on unknown option: first try without announce ip, if it fails again - do nothing (to prevent infinite loop)
if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
next_try_without_announce_ip_address_ = true;
warn("The old version master, can't handle ip-address, try without it again");
// Retry previous state, i.e. send replconf again
return CBState::PREV;
}
if (line[0] == '-' && isRestoringError(line.View())) {
warn("The master was restoring the db, retry later");
return CBState::RESTART;
}
if (!ResponseLineIsOK(line.View())) {
warn("[replication] Failed to replconf: {}", line.get() + 1);
// backward compatible with old version that doesn't support replconf cmd
return CBState::NEXT;
} else {
info("[replication] replconf is ok, start psync");
return CBState::NEXT;
}
}
ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent *bev) {
auto cur_seq = storage_->LatestSeqNumber();
auto next_seq = cur_seq + 1;
std::string replid;
// Get replication id
std::string replid_in_wal = storage_->GetReplIdFromWalBySeq(cur_seq);
// Set if valid replication id
if (replid_in_wal.length() == kReplIdLength) {
replid = replid_in_wal;
} else {
// Maybe there is no WAL, we can get replication id from db since master
// always write replication id into db before any operation when starting
// new "replication history".
std::string replid_in_db = storage_->GetReplIdFromDbEngine();
if (replid_in_db.length() == kReplIdLength) {
replid = replid_in_db;
}
}
// To adapt to old master using old PSYNC, i.e. only use next sequence id.
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", std::to_string(next_seq)}));
info("[replication] Try to use psync, next seq: {}", next_seq);
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, std::to_string(next_seq)}));
info("[replication] Try to use new psync, current unique replication sequence id: {}:{}", replid, cur_seq);
}
repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
return CBState::NEXT;
}
ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-' && isRestoringError(line.View())) {
warn("The master was restoring the db, retry later");
return CBState::RESTART;
}
if (line[0] == '-' && isWrongPsyncNum(line.View())) {
next_try_old_psync_ = true;
warn("The old version master, can't handle new PSYNC, try old PSYNC again");
// Retry previous state, i.e. send PSYNC again
return CBState::PREV;
}
if (!ResponseLineIsOK(line.View())) {
// PSYNC isn't OK, we should use FullSync
// Switch to fullsync state machine
fullsync_steps_.Start();
info("[replication] Failed to psync, error: {}, switch to fullsync", line.get());
return CBState::QUIT;
} else {
// PSYNC is OK, use IncrementBatchLoop
info("[replication] PSync is ok, start increment batch loop");
return CBState::NEXT;
}
}
ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *bev) {
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
switch (incr_state_) {
case Incr_batch_size: {
// Read bulk length
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0;
if (incr_bulk_len_ == 0) {
error("[replication] Invalid increment data size");
return CBState::RESTART;
}
incr_state_ = Incr_batch_data;
break;
}
case Incr_batch_data:
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not enough
return CBState::AGAIN;
}
const char *bulk_data =
reinterpret_cast<const char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;
if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
return CBState::AGAIN;
}
rocksdb::WriteBatch batch(std::move(bulk_string));
auto s = storage_->ReplicaApplyWriteBatch(&batch);
if (!s.IsOK()) {
error("[replication] CRITICAL - Failed to write batch to local, {}. batch: 0x{}", s.Msg(),
util::StringToHex(batch.Data()));
return CBState::RESTART;
}
s = parseWriteBatch(batch);
if (!s.IsOK()) {
error("[replication] CRITICAL - failed to parse write batch 0x{}: {}", util::StringToHex(batch.Data()),
s.Msg());
return CBState::RESTART;
}
break;
}
}
}
ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent *bev) {
SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
info("[replication] Start syncing data with fullsync");
return CBState::NEXT;
}
ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
auto input = bufferevent_get_input(bev);
switch (fullsync_state_) {
case kFetchMetaID: {
// New version master only sends meta file content
if (!srv_->GetConfig()->master_use_repl_port) {
fullsync_state_ = kFetchMetaContent;
return CBState::AGAIN;
}
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
error("[replication] Failed to fetch meta id: {}", line.get());
return CBState::RESTART;
}
fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ? std::strtoul(line.get(), nullptr, 10) : 0);
if (fullsync_meta_id_ == 0) {
error("[replication] Invalid meta id received");
return CBState::RESTART;
}
fullsync_state_ = kFetchMetaSize;
info("[replication] Succeed fetching meta id: {}", fullsync_meta_id_);
}
case kFetchMetaSize: {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
error("[replication] Failed to fetch meta size: {}", line.get());
return CBState::RESTART;
}
fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
if (fullsync_filesize_ == 0) {
error("[replication] Invalid meta file size received");
return CBState::RESTART;
}
fullsync_state_ = kFetchMetaContent;
info("[replication] Succeed fetching meta size: {}", fullsync_filesize_);
}
case kFetchMetaContent: {
std::string target_dir;
engine::Storage::ReplDataManager::MetaInfo meta;
// Master using old version
if (srv_->GetConfig()->master_use_repl_port) {
if (evbuffer_get_length(input) < fullsync_filesize_) {
return CBState::AGAIN;
}
auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_, fullsync_meta_id_, input, &meta);
if (!s.IsOK()) {
error("[replication] Failed to parse meta and save: {}", s.Msg());
return CBState::AGAIN;
}
target_dir = srv_->GetConfig()->backup_sync_dir;
} else {
// Master using new version
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
error("[replication] Failed to fetch meta info: {}", line.get());
return CBState::RESTART;
}
std::vector<std::string> need_files = util::Split(std::string(line.get()), ",");
for (const auto &f : need_files) {
meta.files.emplace_back(f, 0);
}
target_dir = srv_->GetConfig()->sync_checkpoint_dir;
// Clean invalid files of checkpoint, "CURRENT" file must be invalid
// because we identify one file by its file number but only "CURRENT"
// file doesn't have number.
auto iter = std::find(need_files.begin(), need_files.end(), "CURRENT");
if (iter != need_files.end()) need_files.erase(iter);
auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_, target_dir, need_files);
if (!s.IsOK()) {
warn("[replication] Failed to clean up invalid files of the old checkpoint, error: {}", s.Msg());
warn("[replication] Try to clean all checkpoint files");
auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options());
if (!s.ok()) {
warn("[replication] Failed to clean all checkpoint files, error: {}", s.ToString());
}
}
}
assert(evbuffer_get_length(input) == 0);
fullsync_state_ = kFetchMetaID;
info("[replication] Succeeded fetching full data files info, fetching files in parallel");
bool pre_fullsync_done = false;
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
// just like reloading database. And we don't want slave to occupy too much
// disk space, so we just empty entire database rudely.
if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
if (!pre_fullsync_cb_()) return CBState::RESTART;
pre_fullsync_done = true;
storage_->EmptyDB();
}
repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
if (pre_fullsync_done) post_fullsync_cb_();
error("[replication] Failed to parallel fetch files while {}", s.Msg());
return CBState::RESTART;
}
info("[replication] Succeeded fetching files in parallel, restoring the backup");
// Don't need to call 'pre_fullsync_cb_' again if it was called before
if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;
// For old version, master uses rocksdb backup to implement data snapshot
if (srv_->GetConfig()->master_use_repl_port) {
s = storage_->RestoreFromBackup();
} else {
s = storage_->RestoreFromCheckpoint();
}
if (!s.IsOK()) {
error("[replication] Failed to restore backup while {}, restart fullsync", s.Msg());
post_fullsync_cb_();
return CBState::RESTART;
}
info("[replication] Succeeded restoring the backup, fullsync was finish");
post_fullsync_cb_();
// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
error("[replication] Failed to load and rewrite namespace: {}", s.Msg());
}
// Switch to psync state machine again
psync_steps_.Start();
return CBState::QUIT;
}
}
unreachable();
}
Status ReplicationThread::parallelFetchFile(const std::string &dir,
const std::vector<std::pair<std::string, uint32_t>> &files) {
size_t concurrency = 1;
if (files.size() > 20) {
// Use 4 threads to download files in parallel
concurrency = 4;
}
std::atomic<uint32_t> fetch_cnt = {0};
std::atomic<uint32_t> skip_cnt = {0};
std::vector<std::future<Status>> results;
for (size_t tid = 0; tid < concurrency; ++tid) {
results.push_back(
std::async(std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status {
if (this->stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
ssl_st *ssl = nullptr;
#ifdef ENABLE_OPENSSL
if (this->srv_->GetConfig()->tls_replication) {
ssl = SSL_new(this->srv_->ssl_ctx.get());
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
this->srv_->GetConfig()->replication_connect_timeout_ms,
this->srv_->GetConfig()->replication_recv_timeout_ms)
.Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
UniqueFD unique_fd{sock_fd};
auto s = this->sendAuth(sock_fd, ssl);
if (!s.IsOK()) {
return s.Prefixed("send the auth command err");
}
std::vector<std::string> fetch_files;
std::vector<uint32_t> crcs;
for (auto f_idx = tid; f_idx < files.size(); f_idx += concurrency) {
if (this->stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
const auto &f_name = files[f_idx].first;
const auto &f_crc = files[f_idx].second;
// Don't fetch existing files
if (engine::Storage::ReplDataManager::FileExists(this->storage_, dir, f_name, f_crc)) {
skip_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
info("[skip] {} {}, skip count: {}, fetch count: {}, progress: {} / {}", f_name, f_crc, cur_skip_cnt,
cur_fetch_cnt, (cur_skip_cnt + cur_fetch_cnt), files.size());
continue;
}
fetch_files.push_back(f_name);
crcs.push_back(f_crc);
}
unsigned files_count = files.size();
FetchFileCallback fn = [&fetch_cnt, &skip_cnt, files_count](const std::string &fetch_file,
uint32_t fetch_crc) {
fetch_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
info("[fetch] Fetched {}, crc32 {}, skip count: {}, fetch count: {}, progress: {} / {}", fetch_file,
fetch_crc, cur_skip_cnt, cur_fetch_cnt, cur_skip_cnt + cur_fetch_cnt, files_count);
};
// For master using old version, it only supports to fetch a single file by one
// command, so we need to fetch all files by multiple command interactions.
if (srv_->GetConfig()->master_use_repl_port) {
for (unsigned i = 0; i < fetch_files.size(); i++) {
s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn, ssl);
if (!s.IsOK()) break;
}
} else {
if (!fetch_files.empty()) {
s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn, ssl);
}
}
return s;
}));
}
// Wait til finish
for (auto &f : results) {
Status s = f.get();
if (!s.IsOK()) return s;
}
return Status::OK();
}
Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) {
// Send auth when needed
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
UniqueEvbuf evbuf;
const auto auth_command = redis::ArrayOfBulkStrings({"AUTH", auth});
auto s = util::SockSend(sock_fd, auth_command, ssl);
if (!s.IsOK()) return s.Prefixed("send auth command err");
while (true) {
if (auto s = util::EvbufferRead(evbuf.get(), sock_fd, -1, ssl); !s) {
return std::move(s).Prefixed("read auth response err");
}
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) continue;
if (!ResponseLineIsOK(line.View())) {
return {Status::NotOK, "auth got invalid response"};
}
break;
}
}
return Status::OK();
}
Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string &file,
uint32_t crc, const FetchFileCallback &fn, ssl_st *ssl) {
size_t file_size = 0;
// Read file size line
while (true) {
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read size");
}
continue;
}
if (line[0] == '-') {
std::string msg(line.get());
return {Status::NotOK, msg};
}
file_size = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
break;
}
// Write to tmp file
auto tmp_file = engine::Storage::ReplDataManager::NewTmpFile(storage_, dir, file);
if (!tmp_file) {
return {Status::NotOK, "unable to create tmp file"};
}
size_t remain = file_size;
uint32_t tmp_crc = 0;
char data[16 * 1024];
while (remain != 0) {
if (evbuffer_get_length(evbuf) > 0) {
auto data_len = evbuffer_remove(evbuf, data, remain > 16 * 1024 ? 16 * 1024 : remain);
if (data_len == 0) continue;
if (data_len < 0) {
return {Status::NotOK, "read sst file data error"};
}
tmp_file->Append(rocksdb::Slice(data, data_len));
tmp_crc = rocksdb::crc32c::Extend(tmp_crc, data, data_len);
remain -= data_len;
} else {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read sst file");
}
}
}
// Verify file crc checksum if crc is not 0
if (crc && crc != tmp_crc) {
return {Status::NotOK, fmt::format("CRC mismatched, {} was expected but got {}", crc, tmp_crc)};
}
// File is OK, rename to formal name
auto s = engine::Storage::ReplDataManager::SwapTmpFile(storage_, dir, file);
if (!s.IsOK()) return s;
// Call fetch file callback function
fn(file, crc);
return Status::OK();
}
Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, const FetchFileCallback &fn, ssl_st *ssl) {
std::string files_str;
for (const auto &file : files) {
files_str += file;
files_str.push_back(',');
}
files_str.pop_back();
const auto fetch_command = redis::ArrayOfBulkStrings({"_fetch_file", files_str});
auto s = util::SockSend(sock_fd, fetch_command, ssl);
if (!s.IsOK()) return s.Prefixed("send fetch file command");
UniqueEvbuf evbuf;
for (unsigned i = 0; i < files.size(); i++) {
debug("[fetch] Start to fetch file {}", files[i]);
s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn, ssl);
if (!s.IsOK()) {
s = Status(Status::NotOK, "fetch file err: " + s.Msg());
warn("[fetch] Fail to fetch file {}, err: {}", files[i], s.Msg());
break;
}
debug("[fetch] Succeed fetching file {}", files[i]);
// Just for tests
if (srv_->GetConfig()->fullsync_recv_file_delay) {
sleep(srv_->GetConfig()->fullsync_recv_file_delay);
}
}
return s;
}
// Check if stop_flag_ is set, when do, tear down replication
void ReplicationThread::TimerCB(int, int16_t) {
// DLOG(INFO) << "[replication] timer";
if (stop_flag_) {
info("[replication] Stop ev loop");
event_base_loopbreak(base_);
psync_steps_.Stop();
fullsync_steps_.Stop();
}
}
Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch) {
WriteBatchHandler write_batch_handler;
auto db_status = write_batch.Iterate(&write_batch_handler);
if (!db_status.ok()) return {Status::NotOK, "failed to iterate over write batch: " + db_status.ToString()};
switch (write_batch_handler.Type()) {
case kBatchTypePublish:
srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value());
break;
case kBatchTypePropagate:
if (write_batch_handler.Key() == engine::kPropagateScriptCommand) {
std::vector<std::string> tokens = util::TokenizeRedisProtocol(write_batch_handler.Value());
if (!tokens.empty()) {
auto s = srv_->ExecPropagatedCommand(tokens);
if (!s.IsOK()) {
return s.Prefixed("failed to execute propagate command");
}
}
} else if (write_batch_handler.Key() == kNamespaceDBKey) {
auto s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
return s.Prefixed("failed to load namespaces");
}
}
break;
case kBatchTypeStream: {
auto key = write_batch_handler.Key();
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice entry_id = ikey.GetSubKey();
redis::StreamEntryID id;
GetFixed64(&entry_id, &id.ms);
GetFixed64(&entry_id, &id.seq);
srv_->OnEntryAddedToStream(ikey.GetNamespace().ToString(), ikey.GetKey().ToString(), id);
break;
}
case kBatchTypeNone:
break;
}
return Status::OK();
}
bool ReplicationThread::isRestoringError(std::string_view err) {
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup});
}
bool ReplicationThread::isWrongPsyncNum(std::string_view err) {
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumOfArguments});
}
bool ReplicationThread::isUnknownOption(std::string_view err) {
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errUnknownOption});
}
rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const rocksdb::Slice &key,
const rocksdb::Slice &value) {
type_ = kBatchTypeNone;
if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::PubSub)) {
type_ = kBatchTypePublish;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Propagate)) {
type_ = kBatchTypePropagate;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
type_ = kBatchTypeStream;
kv_ = std::make_pair(key.ToString(), value.ToString());
return rocksdb::Status::OK();
}
return rocksdb::Status::OK();
}