src/commands/cmd_replication.cc (271 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "commander.h" #include "error_constants.h" #include "io_util.h" #include "scope_exit.h" #include "server/redis_reply.h" #include "server/server.h" #include "thread_util.h" #include "time_util.h" #include "unique_fd.h" namespace redis { class CommandPSync : public Commander { public: Status Parse(const std::vector<std::string> &args) override { size_t seq_arg = 1; if (args.size() == 3) { seq_arg = 2; new_psync_ = true; } auto parse_result = ParseInt<uint64_t>(args[seq_arg], 10); if (!parse_result) { return {Status::RedisParseErr, "value is not an unsigned long long or out of range"}; } next_repl_seq_ = static_cast<rocksdb::SequenceNumber>(*parse_result); if (new_psync_) { assert(args.size() == 3); replica_replid_ = args[1]; if (replica_replid_.size() != kReplIdLength) { return {Status::RedisParseErr, "Wrong replication id length"}; } } return Commander::Parse(args); } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { info( "Slave {}, listening port: {}, announce ip: {} asks for synchronization " "with next sequence: {}, replication id: {}, and local sequence: {}", conn->GetAddr(), conn->GetListeningPort(), conn->GetAnnounceIP(), next_repl_seq_, (replica_replid_.length() ? replica_replid_ : "not supported"), srv->storage->LatestSeqNumber()); bool need_full_sync = false; // Check replication id of the last sequence log if (new_psync_ && srv->GetConfig()->use_rsid_psync) { std::string replid_in_wal = srv->storage->GetReplIdFromWalBySeq(next_repl_seq_ - 1); info("Replication id in WAL: {}", replid_in_wal); // We check replication id only when WAL has this sequence, since there may be no WAL, // Or WAL may have nothing when starting from db of old version kvrocks. if (replid_in_wal.length() == kReplIdLength && replid_in_wal != replica_replid_) { *output = "wrong replication id of the last log"; need_full_sync = true; } } // Check Log sequence if (!need_full_sync && !checkWALBoundary(srv->storage, next_repl_seq_).IsOK()) { *output = "sequence out of range, please use fullsync"; need_full_sync = true; } if (need_full_sync) { srv->stats.IncrPSyncErrCount(); return {Status::RedisExecErr, *output}; } // Server would spawn a new thread to sync the batch, and connection would // be taken over, so should never trigger any event in worker thread. conn->Detach(); conn->EnableFlag(redis::Connection::kSlave); auto s = util::SockSetBlocking(conn->GetFD(), 1); if (!s.IsOK()) { conn->EnableFlag(redis::Connection::kCloseAsync); return s.Prefixed("failed to set blocking mode on socket"); } srv->stats.IncrPSyncOKCount(); s = srv->AddSlave(conn, next_repl_seq_); if (!s.IsOK()) { std::string err = redis::Error(s); s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent()); if (!s.IsOK()) { warn("failed to send error message to the replica: {}", s.Msg()); } conn->EnableFlag(redis::Connection::kCloseAsync); warn("Failed to add replica: {} to start incremental syncing", conn->GetAddr()); } else { info("New replica: {} was added, start incremental syncing", conn->GetAddr()); } return s; } private: rocksdb::SequenceNumber next_repl_seq_ = 0; bool new_psync_ = false; std::string replica_replid_; // Return OK if the seq is in the range of the current WAL static Status checkWALBoundary(engine::Storage *storage, rocksdb::SequenceNumber seq) { if (seq == storage->LatestSeqNumber() + 1) { return Status::OK(); } // Upper bound if (seq > storage->LatestSeqNumber() + 1) { return {Status::NotOK}; } // Lower bound std::unique_ptr<rocksdb::TransactionLogIterator> iter; auto s = storage->GetWALIter(seq, &iter); if (s.IsOK() && iter->Valid()) { auto batch = iter->GetBatch(); if (seq != batch.sequence) { if (seq > batch.sequence) { error("checkWALBoundary with sequence: {}, but GetWALIter return older sequence: {}", seq, batch.sequence); } return {Status::NotOK}; } return Status::OK(); } return {Status::NotOK}; } }; class CommandReplConf : public Commander { public: Status Parse(const std::vector<std::string> &args) override { if (args.size() % 2 == 0) { return {Status::RedisParseErr, errWrongNumOfArguments}; } for (size_t i = 1; i < args.size(); i += 2) { Status s = ParseParam(util::ToLower(args[i]), args[i + 1]); if (!s.IsOK()) { return s; } } return Commander::Parse(args); } Status ParseParam(const std::string &option, const std::string &value) { if (option == "listening-port") { auto parse_result = ParseInt<int>(value, NumericRange<int>{1, PORT_LIMIT - 1}, 10); if (!parse_result) { return {Status::RedisParseErr, "listening-port should be number or out of range"}; } port_ = *parse_result; } else if (option == "ip-address") { if (value == "") { return {Status::RedisParseErr, "ip-address should not be empty"}; } ip_address_ = value; } else { return {Status::RedisParseErr, errUnknownOption}; } return Status::OK(); } Status Execute([[maybe_unused]] engine::Context &ctx, [[maybe_unused]] Server *srv, Connection *conn, std::string *output) override { if (port_ != 0) { conn->SetListeningPort(port_); } if (!ip_address_.empty()) { conn->SetAnnounceIP(ip_address_); } *output = redis::RESP_OK; return Status::OK(); } private: int port_ = 0; std::string ip_address_; }; class CommandFetchMeta : public Commander { public: Status Parse([[maybe_unused]] const std::vector<std::string> &args) override { return Status::OK(); } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { int repl_fd = conn->GetFD(); std::string ip = conn->GetAnnounceIP(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { return s.Prefixed("failed to set blocking mode on socket"); } conn->NeedNotFreeBufferEvent(); conn->EnableFlag(redis::Connection::kCloseAsync); srv->stats.IncrFullSyncCount(); // Feed-replica-meta thread auto t = GET_OR_RET(util::CreateThread("feed-repl-info", [srv, repl_fd, ip, bev = conn->GetBufferEvent()] { srv->IncrFetchFileThread(); auto exit = MakeScopeExit([srv, bev] { bufferevent_free(bev); srv->DecrFetchFileThread(); }); std::string files; auto s = engine::Storage::ReplDataManager::GetFullReplDataInfo(srv->storage, &files); if (!s.IsOK()) { warn("[replication] Failed to get full data file info: {}", s.Msg()); s = util::SockSend(repl_fd, redis::Error({Status::RedisErrorNoPrefix, "can't create db checkpoint"}), bev); if (!s.IsOK()) { warn("[replication] Failed to send error response: {}", s.Msg()); } return; } // Send full data file info if (auto s = util::SockSend(repl_fd, files + CRLF, bev)) { info("[replication] Succeed sending full data file info to {}", ip); } else { warn("[replication] Fail to send full data file info {}, error: {}", ip, s.Msg()); } auto now_secs = static_cast<time_t>(util::GetTimeStamp()); srv->storage->SetCheckpointAccessTimeSecs(now_secs); })); if (auto s = util::ThreadDetach(t); !s) { return s; } return Status::OK(); } }; class CommandFetchFile : public Commander { public: Status Parse(const std::vector<std::string> &args) override { files_str_ = args[1]; return Status::OK(); } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { std::vector<std::string> files = util::Split(files_str_, ","); int repl_fd = conn->GetFD(); std::string ip = conn->GetAnnounceIP(); auto s = util::SockSetBlocking(repl_fd, 1); if (!s.IsOK()) { return s.Prefixed("failed to set blocking mode on socket"); } conn->NeedNotFreeBufferEvent(); // Feed-replica-file thread will close the replica bufferevent conn->EnableFlag(redis::Connection::kCloseAsync); auto t = GET_OR_RET(util::CreateThread("feed-repl-file", [srv, repl_fd, ip, files, bev = conn->GetBufferEvent()]() { auto exit = MakeScopeExit([bev] { bufferevent_free(bev); }); srv->IncrFetchFileThread(); for (const auto &file : files) { if (srv->IsStopped()) break; uint64_t file_size = 0, max_replication_bytes = 0; if (srv->GetConfig()->max_replication_mb > 0 && srv->GetFetchFileThreadNum() != 0) { max_replication_bytes = (srv->GetConfig()->max_replication_mb * MiB) / srv->GetFetchFileThreadNum(); } auto start = std::chrono::high_resolution_clock::now(); auto fd = UniqueFD(engine::Storage::ReplDataManager::OpenDataFile(srv->storage, file, &file_size)); if (!fd) break; // Send file size and content auto s = util::SockSend(repl_fd, std::to_string(file_size) + CRLF, bev); if (s) { s = util::SockSendFile(repl_fd, *fd, file_size, bev); } if (s) { info("[replication] Succeed sending file {} to {}", file, ip); } else { warn("[replication] Fail to send file {} to {}, error: {}", file, ip, s.Msg()); break; } fd.Close(); // Sleep if the speed of sending file is more than replication speed limit auto end = std::chrono::high_resolution_clock::now(); uint64_t duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); if (max_replication_bytes > 0) { auto shortest = static_cast<uint64_t>(static_cast<double>(file_size) / static_cast<double>(max_replication_bytes) * (1000 * 1000)); if (duration < shortest) { info("[replication] Need to sleep {} ms since of sending files too quickly", (shortest - duration) / 1000); usleep(shortest - duration); } } } auto now_secs = util::GetTimeStamp<std::chrono::seconds>(); srv->storage->SetCheckpointAccessTimeSecs(now_secs); srv->DecrFetchFileThread(); })); if (auto s = util::ThreadDetach(t); !s) { return s; } return Status::OK(); } private: std::string files_str_; }; class CommandDBName : public Commander { public: Status Parse([[maybe_unused]] const std::vector<std::string> &args) override { return Status::OK(); } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { conn->Reply(srv->storage->GetName() + CRLF); return Status::OK(); } }; REDIS_REGISTER_COMMANDS(Replication, MakeCmdAttr<CommandReplConf>("replconf", -3, "read-only no-script", NO_KEY), MakeCmdAttr<CommandPSync>("psync", -2, "read-only no-multi no-script", NO_KEY), MakeCmdAttr<CommandFetchMeta>("_fetch_meta", 1, "read-only no-multi no-script", NO_KEY), MakeCmdAttr<CommandFetchFile>("_fetch_file", 2, "read-only no-multi no-script", NO_KEY), MakeCmdAttr<CommandDBName>("_db_name", 1, "read-only no-multi", NO_KEY), ) } // namespace redis