Status Cluster::CanExecByMySelf()

in src/cluster/cluster.cc [856:944]


Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
                                redis::Connection *conn, lua::ScriptRunCtx *script_run_ctx) {
  std::vector<int> key_indexes;

  attributes->ForEachKeyRange(
      [&](const std::vector<std::string> &, redis::CommandKeyRange key_range) {
        key_range.ForEachKeyIndex([&](int i) { key_indexes.push_back(i); }, cmd_tokens.size());
      },
      cmd_tokens);

  if (key_indexes.empty()) return Status::OK();

  int slot = -1;
  for (auto i : key_indexes) {
    if (i >= static_cast<int>(cmd_tokens.size())) break;

    int cur_slot = GetSlotIdFromKey(cmd_tokens[i]);
    if (slot == -1) slot = cur_slot;
    if (slot != cur_slot) {
      return {Status::RedisCrossSlot, "Attempted to access keys that don't hash to the same slot"};
    }
  }
  if (slot == -1) return Status::OK();

  if (slots_nodes_[slot] == nullptr) {
    return {Status::RedisClusterDown, "Hash slot not served"};
  }

  bool cross_slot_ok = false;
  if (script_run_ctx) {
    if (script_run_ctx->current_slot != -1 && script_run_ctx->current_slot != slot) {
      if (getNodeIDBySlot(script_run_ctx->current_slot) != getNodeIDBySlot(slot)) {
        return {Status::RedisMoved, fmt::format("{} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
      }
      if (!(script_run_ctx->flags & lua::ScriptFlagType::kScriptAllowCrossSlotKeys)) {
        return {Status::RedisCrossSlot, "Script attempted to access keys that do not hash to the same slot"};
      }
    }

    script_run_ctx->current_slot = slot;
    cross_slot_ok = true;
  }

  uint64_t flags = attributes->GenerateFlags(cmd_tokens);

  if (myself_ && myself_ == slots_nodes_[slot]) {
    // We use central controller to manage the topology of the cluster.
    // Server can't change the topology directly, so we record the migrated slots
    // to move the requests of the migrated slots to the destination node.
    if (migrated_slots_.count(slot) > 0) {  // I'm not serving the migrated slot
      return {Status::RedisMoved, fmt::format("{} {}", slot, migrated_slots_[slot])};
    }
    // To keep data consistency, slot will be forbidden write while sending the last incremental data.
    // During this phase, the requests of the migrating slot has to be rejected.
    if ((flags & redis::kCmdWrite) && IsWriteForbiddenSlot(slot)) {
      return {Status::RedisTryAgain, "Can't write to slot being migrated which is in write forbidden phase"};
    }

    return Status::OK();  // I'm serving this slot
  }

  if (myself_ && myself_->importing_slot_range.Contains(slot) &&
      (conn->IsImporting() || conn->IsFlagEnabled(redis::Connection::kAsking))) {
    // While data migrating, the topology of the destination node has not been changed.
    // The destination node has to serve the requests from the migrating slot,
    // although the slot is not belong to itself. Therefore, we record the importing slot
    // and mark the importing connection to accept the importing data.
    return Status::OK();  // I'm serving the importing connection or asking connection
  }

  if (myself_ && imported_slots_.count(slot)) {
    // After the slot is migrated, new requests of the migrated slot will be moved to
    // the destination server. Before the central controller change the topology, the destination
    // server should record the imported slots to accept new data of the imported slots.
    return Status::OK();  // I'm serving the imported slot
  }

  if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) &&
      nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] &&
      conn->IsFlagEnabled(redis::Connection::kReadOnly)) {
    return Status::OK();  // My master is serving this slot
  }

  if (!cross_slot_ok) {
    return {Status::RedisMoved, fmt::format("{} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
  }

  return Status::OK();
}