in src/cluster/cluster.cc [150:226]
Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, bool force) {
if (version < 0) return {Status::NotOK, errInvalidClusterVersion};
if (!force) {
// Low version wants to reset current version
if (version_ > version) {
return {Status::NotOK, errInvalidClusterVersion};
}
// The same version, it is not needed to update
if (version_ == version) return Status::OK();
}
ClusterNodes nodes;
std::unordered_map<int, std::string> slots_nodes;
Status s = parseClusterNodes(nodes_str, &nodes, &slots_nodes);
if (!s.IsOK()) return s;
// Update version and cluster topology
version_ = version;
nodes_ = nodes;
size_ = 0;
// Update slots to nodes
for (const auto &[slot, node_id] : slots_nodes) {
slots_nodes_[slot] = nodes_[node_id];
}
// Update replicas info and size
for (const auto &[node_id, node] : nodes_) {
if (node->role == kClusterSlave) {
if (nodes_.find(node->master_id) != nodes_.end()) {
nodes_[node->master_id]->replicas.push_back(node_id);
}
}
if (node->role == kClusterMaster && node->slots.count() > 0) {
size_++;
}
}
if (myid_.empty() || force) {
for (const auto &[node_id, node] : nodes_) {
if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) {
myid_ = node_id;
break;
}
}
}
myself_ = nullptr;
if (!myid_.empty() && nodes_.find(myid_) != nodes_.end()) {
myself_ = nodes_[myid_];
}
// Set replication relationship
if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}
// Clear data of migrated slots
if (!migrated_slots_.empty()) {
engine::Context ctx(srv_->storage);
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
error("failed to clear data of migrated slots: {}", s.ToString());
}
}
}
}
// Clear migrated and imported slot info
migrated_slots_.clear();
imported_slots_.clear();
return Status::OK();
}