Status Parse()

in src/commands/cmd_stream.cc [1421:1511]


  Status Parse(const std::vector<std::string> &args) override {
    size_t streams_word_idx = 0;
    if (util::ToLower(args[1]) != "group") {
      return {Status::RedisParseErr, errInvalidSyntax};
    }
    group_name_ = args[2];
    consumer_name_ = args[3];

    for (size_t i = 4; i < args.size();) {
      auto arg = util::ToLower(args[i]);

      if (arg == "streams") {
        streams_word_idx = i;
        break;
      }

      if (arg == "count") {
        if (i + 1 >= args.size()) {
          return {Status::RedisParseErr, errInvalidSyntax};
        }

        with_count_ = true;

        auto parse_result = ParseInt<uint64_t>(args[i + 1], 10);
        if (!parse_result) {
          return {Status::RedisParseErr, errValueNotInteger};
        }

        count_ = *parse_result;
        i += 2;
        continue;
      }

      if (arg == "block") {
        if (i + 1 >= args.size()) {
          return {Status::RedisParseErr, errInvalidSyntax};
        }

        block_ = true;

        auto parse_result = ParseInt<int64_t>(args[i + 1], 10);
        if (!parse_result) {
          return {Status::RedisParseErr, errValueNotInteger};
        }

        if (*parse_result < 0) {
          return {Status::RedisParseErr, errTimeoutIsNegative};
        }

        block_timeout_ = *parse_result;
        i += 2;
        continue;
      }

      if (arg == "noack") {
        noack_ = true;
      }

      ++i;
    }

    if (streams_word_idx == 0) {
      return {Status::RedisParseErr, errInvalidSyntax};
    }

    if ((args.size() - streams_word_idx - 1) % 2 != 0) {
      return {Status::RedisParseErr, errUnbalancedStreamList};
    }

    size_t number_of_streams = (args.size() - streams_word_idx - 1) / 2;

    for (size_t i = streams_word_idx + 1; i <= streams_word_idx + number_of_streams; ++i) {
      streams_.push_back(args[i]);
      const auto &id_str = args[i + number_of_streams];
      bool get_latest = id_str == ">";
      latest_marks_.push_back(get_latest);
      if (!get_latest) {
        block_ = false;
      }
      StreamEntryID id;
      if (!get_latest) {
        auto s = ParseStreamEntryID(id_str, &id);
        if (!s.IsOK()) {
          return s;
        }
      }
      ids_.push_back(id);
    }

    return Status::OK();
  }