in src/cluster/cluster.cc [856:944]
Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn, lua::ScriptRunCtx *script_run_ctx) {
std::vector<int> key_indexes;
attributes->ForEachKeyRange(
[&](const std::vector<std::string> &, redis::CommandKeyRange key_range) {
key_range.ForEachKeyIndex([&](int i) { key_indexes.push_back(i); }, cmd_tokens.size());
},
cmd_tokens);
if (key_indexes.empty()) return Status::OK();
int slot = -1;
for (auto i : key_indexes) {
if (i >= static_cast<int>(cmd_tokens.size())) break;
int cur_slot = GetSlotIdFromKey(cmd_tokens[i]);
if (slot == -1) slot = cur_slot;
if (slot != cur_slot) {
return {Status::RedisCrossSlot, "Attempted to access keys that don't hash to the same slot"};
}
}
if (slot == -1) return Status::OK();
if (slots_nodes_[slot] == nullptr) {
return {Status::RedisClusterDown, "Hash slot not served"};
}
bool cross_slot_ok = false;
if (script_run_ctx) {
if (script_run_ctx->current_slot != -1 && script_run_ctx->current_slot != slot) {
if (getNodeIDBySlot(script_run_ctx->current_slot) != getNodeIDBySlot(slot)) {
return {Status::RedisMoved, fmt::format("{} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
}
if (!(script_run_ctx->flags & lua::ScriptFlagType::kScriptAllowCrossSlotKeys)) {
return {Status::RedisCrossSlot, "Script attempted to access keys that do not hash to the same slot"};
}
}
script_run_ctx->current_slot = slot;
cross_slot_ok = true;
}
uint64_t flags = attributes->GenerateFlags(cmd_tokens);
if (myself_ && myself_ == slots_nodes_[slot]) {
// We use central controller to manage the topology of the cluster.
// Server can't change the topology directly, so we record the migrated slots
// to move the requests of the migrated slots to the destination node.
if (migrated_slots_.count(slot) > 0) { // I'm not serving the migrated slot
return {Status::RedisMoved, fmt::format("{} {}", slot, migrated_slots_[slot])};
}
// To keep data consistency, slot will be forbidden write while sending the last incremental data.
// During this phase, the requests of the migrating slot has to be rejected.
if ((flags & redis::kCmdWrite) && IsWriteForbiddenSlot(slot)) {
return {Status::RedisTryAgain, "Can't write to slot being migrated which is in write forbidden phase"};
}
return Status::OK(); // I'm serving this slot
}
if (myself_ && myself_->importing_slot_range.Contains(slot) &&
(conn->IsImporting() || conn->IsFlagEnabled(redis::Connection::kAsking))) {
// While data migrating, the topology of the destination node has not been changed.
// The destination node has to serve the requests from the migrating slot,
// although the slot is not belong to itself. Therefore, we record the importing slot
// and mark the importing connection to accept the importing data.
return Status::OK(); // I'm serving the importing connection or asking connection
}
if (myself_ && imported_slots_.count(slot)) {
// After the slot is migrated, new requests of the migrated slot will be moved to
// the destination server. Before the central controller change the topology, the destination
// server should record the imported slots to accept new data of the imported slots.
return Status::OK(); // I'm serving the imported slot
}
if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) &&
nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] &&
conn->IsFlagEnabled(redis::Connection::kReadOnly)) {
return Status::OK(); // My master is serving this slot
}
if (!cross_slot_ok) {
return {Status::RedisMoved, fmt::format("{} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
}
return Status::OK();
}