Status Cluster::SetClusterNodes()

in src/cluster/cluster.cc [150:226]


Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, bool force) {
  if (version < 0) return {Status::NotOK, errInvalidClusterVersion};

  if (!force) {
    // Low version wants to reset current version
    if (version_ > version) {
      return {Status::NotOK, errInvalidClusterVersion};
    }

    // The same version, it is not needed to update
    if (version_ == version) return Status::OK();
  }

  ClusterNodes nodes;
  std::unordered_map<int, std::string> slots_nodes;
  Status s = parseClusterNodes(nodes_str, &nodes, &slots_nodes);
  if (!s.IsOK()) return s;

  // Update version and cluster topology
  version_ = version;
  nodes_ = nodes;
  size_ = 0;

  // Update slots to nodes
  for (const auto &[slot, node_id] : slots_nodes) {
    slots_nodes_[slot] = nodes_[node_id];
  }

  // Update replicas info and size
  for (const auto &[node_id, node] : nodes_) {
    if (node->role == kClusterSlave) {
      if (nodes_.find(node->master_id) != nodes_.end()) {
        nodes_[node->master_id]->replicas.push_back(node_id);
      }
    }
    if (node->role == kClusterMaster && node->slots.count() > 0) {
      size_++;
    }
  }

  if (myid_.empty() || force) {
    for (const auto &[node_id, node] : nodes_) {
      if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) {
        myid_ = node_id;
        break;
      }
    }
  }

  myself_ = nullptr;
  if (!myid_.empty() && nodes_.find(myid_) != nodes_.end()) {
    myself_ = nodes_[myid_];
  }

  // Set replication relationship
  if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
    return s.Prefixed("failed to set master-replica replication");
  }

  // Clear data of migrated slots
  if (!migrated_slots_.empty()) {
    engine::Context ctx(srv_->storage);
    for (const auto &[slot, _] : migrated_slots_) {
      if (slots_nodes_[slot] != myself_) {
        auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
        if (!s.ok()) {
          error("failed to clear data of migrated slots: {}", s.ToString());
        }
      }
    }
  }
  // Clear migrated and imported slot info
  migrated_slots_.clear();
  imported_slots_.clear();

  return Status::OK();
}