in src/brpc/redis_reply.cpp [92:270]
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf) {
if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
// The parsing was suspended while parsing sub replies,
// continue the parsing.
RedisReply* subs = (RedisReply*)_data.array.replies;
for (int i = _data.array.last_index; i < _length; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) {
return err;
}
++_data.array.last_index;
}
// We've got an intact reply. reset the index.
_data.array.last_index = -1;
return PARSE_OK;
}
// Notice that all branches returning PARSE_ERROR_NOT_ENOUGH_DATA must not change `buf'.
const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
const char fc = *pfc; // first character
switch (fc) {
case '-': // Error "-<message>\r\n"
case '+': { // Simple String "+<string>\r\n"
butil::IOBuf str;
if (buf.cut_until(&str, "\r\n") != 0) {
const size_t len = buf.size();
if (len > std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "simple string is too long! max length=2^32-1,"
" actually=" << len;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
const size_t len = str.size() - 1;
if (len < sizeof(_data.short_str)) {
// SSO short strings, including empty string.
_type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
_length = len;
str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/);
return PARSE_OK;
}
char* d = (char*)_arena->allocate((len/8 + 1)*8);
if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
CHECK_EQ(len, str.copy_to_cstr(d, (size_t)-1L, 1/*skip fc*/));
_type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
_length = len;
_data.long_str = d;
return PARSE_OK;
}
case '$': // Bulk String "$<length>\r\n<string>\r\n"
case '*': // Array "*<size>\r\n<sub-reply1><sub-reply2>..."
case ':': { // Integer ":<integer>\r\n"
char intbuf[32]; // enough for fc + 64-bit decimal + \r\n
const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
intbuf[ncopied] = '\0';
const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
char* endptr = NULL;
int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
if (endptr != intbuf + crlf_pos) {
LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (fc == ':') {
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_INTEGER;
_length = 0;
_data.integer = value;
return PARSE_OK;
} else if (fc == '$') {
const int64_t len = value; // `value' is length of the string
if (len < 0) { // redis nil
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_NIL;
_length = 0;
_data.integer = 0;
return PARSE_OK;
}
if (len > (int64_t)std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "bulk string is too long! max length=2^32-1,"
" actually=" << len;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
// We provide c_str(), thus even if bulk string is started with
// length, we have to end it with \0.
if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
if ((size_t)len < sizeof(_data.short_str)) {
// SSO short strings, including empty string.
_type = REDIS_REPLY_STRING;
_length = len;
buf.pop_front(crlf_pos + 2);
buf.cutn(_data.short_str, len);
_data.short_str[len] = '\0';
} else {
char* d = (char*)_arena->allocate((len/8 + 1)*8);
if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
buf.cutn(d, len);
d[len] = '\0';
_type = REDIS_REPLY_STRING;
_length = len;
_data.long_str = d;
}
char crlf[2];
buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') {
LOG(ERROR) << "Bulk string is not ended with CRLF";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
return PARSE_OK;
} else {
const int64_t count = value; // `value' is count of sub replies
if (count < 0) { // redis nil
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_NIL;
_length = 0;
_data.integer = 0;
return PARSE_OK;
}
if (count == 0) { // empty array
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY;
_length = 0;
_data.array.last_index = -1;
_data.array.replies = NULL;
return PARSE_OK;
}
if (count > (int64_t)std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "Too many sub replies! max count=2^32-1,"
" actually=" << count;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
// FIXME(gejun): Call allocate_aligned instead.
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * count);
if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisReply(_arena);
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY;
_length = count;
_data.array.replies = subs;
// Recursively parse sub replies. If any of them fails, it will
// be continued in next calls by tracking _data.array.last_index.
_data.array.last_index = 0;
for (int64_t i = 0; i < count; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) {
return err;
}
++_data.array.last_index;
}
_data.array.last_index = -1;
return PARSE_OK;
}
}
default:
LOG(ERROR) << "Invalid first character=" << (int)fc;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
return PARSE_ERROR_ABSOLUTELY_WRONG;
}