in src/server/redis_connection.cc [375:587]
void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
const Config *config = srv_->GetConfig();
std::string reply;
const std::string &password = config->requirepass;
while (!to_process_cmds->empty()) {
CommandTokens cmd_tokens = std::move(to_process_cmds->front());
to_process_cmds->pop_front();
if (cmd_tokens.empty()) continue;
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) break;
auto cmd_s = Server::LookupAndCreateCommand(cmd_tokens.front());
if (!cmd_s.IsOK()) {
auto cmd_name = cmd_tokens.front();
if (util::EqualICase(cmd_name, "host:") || util::EqualICase(cmd_name, "post")) {
warn(
"[connection] A likely HTTP request is detected in the RESP connection, indicating a potential "
"Cross-Protocol Scripting attack. Connection aborted.");
EnableFlag(kCloseAsync);
return;
}
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error(
{Status::NotOK,
fmt::format("unknown command `{}`, with args beginning with: {}", cmd_name,
util::StringJoin(nonstd::span(cmd_tokens.begin() + 1, cmd_tokens.end()),
[](const auto &v) -> decltype(auto) { return fmt::format("`{}`", v); }))}));
continue;
}
auto current_cmd = std::move(*cmd_s);
const auto &attributes = current_cmd->GetAttributes();
auto cmd_name = attributes->name;
int tokens = static_cast<int>(cmd_tokens.size());
if (!attributes->CheckArity(tokens)) {
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error({Status::NotOK, "wrong number of arguments"}));
continue;
}
auto cmd_flags = attributes->GenerateFlags(cmd_tokens);
if (GetNamespace().empty()) {
if (!password.empty()) {
if (!(cmd_flags & kCmdAuth)) {
Reply(redis::Error({Status::RedisNoAuth, "Authentication required."}));
continue;
}
} else {
BecomeAdmin();
SetNamespace(kDefaultNamespace);
}
}
std::shared_lock<std::shared_mutex> concurrency; // Allow concurrency
std::unique_lock<std::shared_mutex> exclusivity; // Need exclusivity
// If the command needs to process exclusively, we need to get 'ExclusivityGuard'
// that can guarantee other threads can't come into critical zone, such as DEBUG,
// CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future).
// Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute commands at the same time.
if (is_multi_exec && !(cmd_flags & kCmdBypassMulti)) {
// No lock guard, because 'exec' command has acquired 'WorkExclusivityGuard'
} else if (cmd_flags & kCmdExclusive) {
exclusivity = srv_->WorkExclusivityGuard();
} else {
concurrency = srv_->WorkConcurrencyGuard();
}
if (srv_->IsLoading() && !(cmd_flags & kCmdLoading)) {
Reply(redis::Error({Status::RedisLoading, errRestoringBackup}));
if (is_multi_exec) multi_error_ = true;
continue;
}
current_cmd->SetArgs(cmd_tokens);
auto s = current_cmd->Parse();
if (!s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error(s));
continue;
}
if (is_multi_exec && (cmd_flags & kCmdNoMulti)) {
Reply(redis::Error({Status::NotOK, fmt::format("{} inside MULTI is not allowed", util::ToUpper(cmd_name))}));
multi_error_ = true;
continue;
}
if ((cmd_flags & kCmdAdmin) && !IsAdmin()) {
Reply(redis::Error({Status::RedisExecErr, errAdminPermissionRequired}));
continue;
}
if (config->cluster_enabled) {
s = srv_->cluster->CanExecByMySelf(attributes, cmd_tokens, this);
if (!s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
Reply(redis::Error(s));
continue;
}
}
// reset the ASKING flag after executing the next query
if (IsFlagEnabled(kAsking)) {
DisableFlag(kAsking);
}
// We don't execute commands, but queue them, and then execute in EXEC command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdBypassMulti)) {
multi_cmds_.emplace_back(std::move(cmd_tokens));
Reply(redis::SimpleString("QUEUED"));
continue;
}
if (config->slave_readonly && srv_->IsSlave() && (cmd_flags & kCmdWrite)) {
Reply(redis::Error({Status::RedisReadOnly, "You can't write against a read only slave."}));
continue;
}
if ((cmd_flags & kCmdWrite) && !(cmd_flags & kCmdNoDBSizeCheck) && srv_->storage->ReachedDBSizeLimit()) {
Reply(redis::Error({Status::NotOK, "write command not allowed when reached max-db-size."}));
continue;
}
if (!config->slave_serve_stale_data && srv_->IsSlave() && !IsCmdAllowedInStaleData(cmd_name) &&
srv_->GetReplicationState() != kReplConnected) {
Reply(redis::Error({Status::RedisMasterDown,
"Link with MASTER is down "
"and slave-serve-stale-data is set to 'no'."}));
continue;
}
ScopeExit in_script_exit{[this] { in_script_ = false; }, false};
if (attributes->category == CommandCategory::Script || attributes->category == CommandCategory::Function) {
in_script_ = true;
in_script_exit.Enable();
}
SetLastCmd(cmd_name);
{
std::optional<MultiLockGuard> guard;
if (cmd_flags & kCmdWrite) {
std::vector<std::string> lock_keys;
attributes->ForEachKeyRange(
[&lock_keys, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
key_range.ForEachKey(
[&, this](const std::string &key) {
auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded());
lock_keys.emplace_back(std::move(ns_key));
},
args);
},
cmd_tokens);
guard.emplace(srv_->storage->GetLockManager(), lock_keys);
}
engine::Context ctx(srv_->storage);
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) &&
!config->cluster_enabled) {
attributes->ForEachKeyRange(
[&, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
key_range.ForEachKey(
[&, this](const std::string &key) {
auto res = srv_->indexer.Record(ctx, key, ns_);
if (res.IsOK()) {
index_records.push_back(*res);
} else if (!res.Is<Status::NoPrefixMatched>() && !res.Is<Status::TypeMismatched>()) {
warn("[connection] index recording failed for key: {}", key);
}
},
args);
},
cmd_tokens);
}
s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
warn("[connection] index updating failed for key: {}", record.key);
}
}
}
srv_->FeedMonitorConns(this, cmd_tokens);
// Break the execution loop when occurring the blocking command like BLPOP or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
// For the blocking command, it will use the command while resumed from the suspend state.
// So we need to save the command for the next execution.
// Migrate connection would also check the saved_current_command_ to determine whether
// the connection can be migrated or not.
saved_current_command_ = std::move(current_cmd);
break;
}
// Reply for MULTI
if (!s.IsOK()) {
Reply(redis::Error(s));
continue;
}
srv_->UpdateWatchedKeysFromArgs(cmd_tokens, *attributes);
if (!reply.empty()) Reply(reply);
reply.clear();
}
}