in src/commands/cmd_stream.cc [1421:1511]
Status Parse(const std::vector<std::string> &args) override {
size_t streams_word_idx = 0;
if (util::ToLower(args[1]) != "group") {
return {Status::RedisParseErr, errInvalidSyntax};
}
group_name_ = args[2];
consumer_name_ = args[3];
for (size_t i = 4; i < args.size();) {
auto arg = util::ToLower(args[i]);
if (arg == "streams") {
streams_word_idx = i;
break;
}
if (arg == "count") {
if (i + 1 >= args.size()) {
return {Status::RedisParseErr, errInvalidSyntax};
}
with_count_ = true;
auto parse_result = ParseInt<uint64_t>(args[i + 1], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
count_ = *parse_result;
i += 2;
continue;
}
if (arg == "block") {
if (i + 1 >= args.size()) {
return {Status::RedisParseErr, errInvalidSyntax};
}
block_ = true;
auto parse_result = ParseInt<int64_t>(args[i + 1], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*parse_result < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}
block_timeout_ = *parse_result;
i += 2;
continue;
}
if (arg == "noack") {
noack_ = true;
}
++i;
}
if (streams_word_idx == 0) {
return {Status::RedisParseErr, errInvalidSyntax};
}
if ((args.size() - streams_word_idx - 1) % 2 != 0) {
return {Status::RedisParseErr, errUnbalancedStreamList};
}
size_t number_of_streams = (args.size() - streams_word_idx - 1) / 2;
for (size_t i = streams_word_idx + 1; i <= streams_word_idx + number_of_streams; ++i) {
streams_.push_back(args[i]);
const auto &id_str = args[i + number_of_streams];
bool get_latest = id_str == ">";
latest_marks_.push_back(get_latest);
if (!get_latest) {
block_ = false;
}
StreamEntryID id;
if (!get_latest) {
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) {
return s;
}
}
ids_.push_back(id);
}
return Status::OK();
}