Status Request::Tokenize()

in src/server/redis_request.cc [39:140]


Status Request::Tokenize(evbuffer *input) {
  size_t pipeline_size = 0;

  while (true) {
    switch (state_) {
      case ArrayLen: {
        bool is_only_lf = true;
        // We don't use the `EVBUFFER_EOL_CRLF_STRICT` here since only LF is allowed in INLINE protocol.
        // So we need to search LF EOL and figure out current line has CR or not.
        UniqueEvbufReadln line(input, EVBUFFER_EOL_LF);
        if (line && line.length > 0 && line[line.length - 1] == '\r') {
          // remove `\r` if exists
          --line.length;
          is_only_lf = false;
        }

        if (!line || line.length <= 0) {
          if (pipeline_size > 128) {
            info("[request] Large pipeline detected: {}", pipeline_size);
          }
          if (line) {
            continue;
          }
          return Status::OK();
        }

        pipeline_size++;
        srv_->stats.IncrInboundBytes(line.length);
        if (line[0] == '*') {
          auto parse_result = ParseInt<int64_t>(std::string(line.get() + 1, line.length - 1), 10);
          if (!parse_result) {
            return {Status::NotOK, "Protocol error: invalid multibulk length"};
          }

          multi_bulk_len_ = *parse_result;
          if (is_only_lf || multi_bulk_len_ > (int64_t)PROTO_MULTI_MAX_SIZE) {
            return {Status::NotOK, "Protocol error: invalid multibulk length"};
          }

          if (multi_bulk_len_ <= 0) {
            multi_bulk_len_ = 0;
            continue;
          }

          state_ = BulkLen;
        } else {
          if (line.length > PROTO_INLINE_MAX_SIZE) {
            return {Status::NotOK, "Protocol error: invalid bulk length"};
          }

          auto arguments = util::SplitArguments(line.get());
          if (!arguments.IsOK()) {
            return {Status::NotOK, "Protocol error: " + arguments.Msg()};
          }
          tokens_ = std::move(arguments.GetValue());
          if (tokens_.empty()) continue;
          commands_.emplace_back(std::move(tokens_));
          state_ = ArrayLen;
        }
        break;
      }
      case BulkLen: {
        UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
        if (!line || line.length <= 0) return Status::OK();

        srv_->stats.IncrInboundBytes(line.length);
        if (line[0] != '$') {
          return {Status::NotOK, "Protocol error: expected '$'"};
        }

        auto parse_result = ParseInt<uint64_t>(std::string(line.get() + 1, line.length - 1), 10);
        if (!parse_result) {
          return {Status::NotOK, "Protocol error: invalid bulk length"};
        }

        bulk_len_ = *parse_result;
        if (bulk_len_ > srv_->GetConfig()->proto_max_bulk_len) {
          return {Status::NotOK, "Protocol error: invalid bulk length"};
        }

        state_ = BulkData;
        break;
      }
      case BulkData:
        if (evbuffer_get_length(input) < bulk_len_ + 2) return Status::OK();

        char *data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(bulk_len_ + 2)));
        tokens_.emplace_back(data, bulk_len_);
        evbuffer_drain(input, bulk_len_ + 2);
        srv_->stats.IncrInboundBytes(bulk_len_ + 2);
        --multi_bulk_len_;
        if (multi_bulk_len_ == 0) {
          state_ = ArrayLen;
          commands_.emplace_back(std::move(tokens_));
          tokens_.clear();
        } else {
          state_ = BulkLen;
        }
        break;
    }
  }
}