void Connection::ExecuteCommands()

in src/server/redis_connection.cc [375:587]


void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
  const Config *config = srv_->GetConfig();
  std::string reply;
  const std::string &password = config->requirepass;

  while (!to_process_cmds->empty()) {
    CommandTokens cmd_tokens = std::move(to_process_cmds->front());
    to_process_cmds->pop_front();
    if (cmd_tokens.empty()) continue;

    bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
    if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) break;

    auto cmd_s = Server::LookupAndCreateCommand(cmd_tokens.front());
    if (!cmd_s.IsOK()) {
      auto cmd_name = cmd_tokens.front();
      if (util::EqualICase(cmd_name, "host:") || util::EqualICase(cmd_name, "post")) {
        warn(
            "[connection] A likely HTTP request is detected in the RESP connection, indicating a potential "
            "Cross-Protocol Scripting attack. Connection aborted.");
        EnableFlag(kCloseAsync);
        return;
      }
      if (is_multi_exec) multi_error_ = true;
      Reply(redis::Error(
          {Status::NotOK,
           fmt::format("unknown command `{}`, with args beginning with: {}", cmd_name,
                       util::StringJoin(nonstd::span(cmd_tokens.begin() + 1, cmd_tokens.end()),
                                        [](const auto &v) -> decltype(auto) { return fmt::format("`{}`", v); }))}));
      continue;
    }
    auto current_cmd = std::move(*cmd_s);

    const auto &attributes = current_cmd->GetAttributes();
    auto cmd_name = attributes->name;

    int tokens = static_cast<int>(cmd_tokens.size());
    if (!attributes->CheckArity(tokens)) {
      if (is_multi_exec) multi_error_ = true;
      Reply(redis::Error({Status::NotOK, "wrong number of arguments"}));
      continue;
    }

    auto cmd_flags = attributes->GenerateFlags(cmd_tokens);
    if (GetNamespace().empty()) {
      if (!password.empty()) {
        if (!(cmd_flags & kCmdAuth)) {
          Reply(redis::Error({Status::RedisNoAuth, "Authentication required."}));
          continue;
        }
      } else {
        BecomeAdmin();
        SetNamespace(kDefaultNamespace);
      }
    }

    std::shared_lock<std::shared_mutex> concurrency;  // Allow concurrency
    std::unique_lock<std::shared_mutex> exclusivity;  // Need exclusivity
    // If the command needs to process exclusively, we need to get 'ExclusivityGuard'
    // that can guarantee other threads can't come into critical zone, such as DEBUG,
    // CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future).
    // Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute commands at the same time.
    if (is_multi_exec && !(cmd_flags & kCmdBypassMulti)) {
      // No lock guard, because 'exec' command has acquired 'WorkExclusivityGuard'
    } else if (cmd_flags & kCmdExclusive) {
      exclusivity = srv_->WorkExclusivityGuard();
    } else {
      concurrency = srv_->WorkConcurrencyGuard();
    }

    if (srv_->IsLoading() && !(cmd_flags & kCmdLoading)) {
      Reply(redis::Error({Status::RedisLoading, errRestoringBackup}));
      if (is_multi_exec) multi_error_ = true;
      continue;
    }

    current_cmd->SetArgs(cmd_tokens);
    auto s = current_cmd->Parse();
    if (!s.IsOK()) {
      if (is_multi_exec) multi_error_ = true;
      Reply(redis::Error(s));
      continue;
    }

    if (is_multi_exec && (cmd_flags & kCmdNoMulti)) {
      Reply(redis::Error({Status::NotOK, fmt::format("{} inside MULTI is not allowed", util::ToUpper(cmd_name))}));
      multi_error_ = true;
      continue;
    }

    if ((cmd_flags & kCmdAdmin) && !IsAdmin()) {
      Reply(redis::Error({Status::RedisExecErr, errAdminPermissionRequired}));
      continue;
    }

    if (config->cluster_enabled) {
      s = srv_->cluster->CanExecByMySelf(attributes, cmd_tokens, this);
      if (!s.IsOK()) {
        if (is_multi_exec) multi_error_ = true;
        Reply(redis::Error(s));
        continue;
      }
    }

    // reset the ASKING flag after executing the next query
    if (IsFlagEnabled(kAsking)) {
      DisableFlag(kAsking);
    }

    // We don't execute commands, but queue them, and then execute in EXEC command
    if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdBypassMulti)) {
      multi_cmds_.emplace_back(std::move(cmd_tokens));
      Reply(redis::SimpleString("QUEUED"));
      continue;
    }

    if (config->slave_readonly && srv_->IsSlave() && (cmd_flags & kCmdWrite)) {
      Reply(redis::Error({Status::RedisReadOnly, "You can't write against a read only slave."}));
      continue;
    }

    if ((cmd_flags & kCmdWrite) && !(cmd_flags & kCmdNoDBSizeCheck) && srv_->storage->ReachedDBSizeLimit()) {
      Reply(redis::Error({Status::NotOK, "write command not allowed when reached max-db-size."}));
      continue;
    }

    if (!config->slave_serve_stale_data && srv_->IsSlave() && !IsCmdAllowedInStaleData(cmd_name) &&
        srv_->GetReplicationState() != kReplConnected) {
      Reply(redis::Error({Status::RedisMasterDown,
                          "Link with MASTER is down "
                          "and slave-serve-stale-data is set to 'no'."}));
      continue;
    }

    ScopeExit in_script_exit{[this] { in_script_ = false; }, false};
    if (attributes->category == CommandCategory::Script || attributes->category == CommandCategory::Function) {
      in_script_ = true;
      in_script_exit.Enable();
    }

    SetLastCmd(cmd_name);
    {
      std::optional<MultiLockGuard> guard;
      if (cmd_flags & kCmdWrite) {
        std::vector<std::string> lock_keys;
        attributes->ForEachKeyRange(
            [&lock_keys, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
              key_range.ForEachKey(
                  [&, this](const std::string &key) {
                    auto ns_key = ComposeNamespaceKey(ns_, key, srv_->storage->IsSlotIdEncoded());
                    lock_keys.emplace_back(std::move(ns_key));
                  },
                  args);
            },
            cmd_tokens);

        guard.emplace(srv_->storage->GetLockManager(), lock_keys);
      }
      engine::Context ctx(srv_->storage);

      std::vector<GlobalIndexer::RecordResult> index_records;
      if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags, attributes->category) &&
          !config->cluster_enabled) {
        attributes->ForEachKeyRange(
            [&, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
              key_range.ForEachKey(
                  [&, this](const std::string &key) {
                    auto res = srv_->indexer.Record(ctx, key, ns_);
                    if (res.IsOK()) {
                      index_records.push_back(*res);
                    } else if (!res.Is<Status::NoPrefixMatched>() && !res.Is<Status::TypeMismatched>()) {
                      warn("[connection] index recording failed for key: {}", key);
                    }
                  },
                  args);
            },
            cmd_tokens);
      }

      s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
      for (const auto &record : index_records) {
        auto s = GlobalIndexer::Update(ctx, record);
        if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
          warn("[connection] index updating failed for key: {}", record.key);
        }
      }
    }

    srv_->FeedMonitorConns(this, cmd_tokens);

    // Break the execution loop when occurring the blocking command like BLPOP or BRPOP,
    // it will suspend the connection and wait for the wakeup signal.
    if (s.Is<Status::BlockingCmd>()) {
      // For the blocking command, it will use the command while resumed from the suspend state.
      // So we need to save the command for the next execution.
      // Migrate connection would also check the saved_current_command_ to determine whether
      // the connection can be migrated or not.
      saved_current_command_ = std::move(current_cmd);
      break;
    }

    // Reply for MULTI
    if (!s.IsOK()) {
      Reply(redis::Error(s));
      continue;
    }

    srv_->UpdateWatchedKeysFromArgs(cmd_tokens, *attributes);

    if (!reply.empty()) Reply(reply);
    reply.clear();
  }
}