ParseError RedisReply::ConsumePartialIOBuf()

in src/brpc/redis_reply.cpp [92:270]


ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf) {
    if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
        // The parsing was suspended while parsing sub replies,
        // continue the parsing.
        RedisReply* subs = (RedisReply*)_data.array.replies;
        for (int i = _data.array.last_index; i < _length; ++i) {
            ParseError err = subs[i].ConsumePartialIOBuf(buf);
            if (err != PARSE_OK) {
                return err;
            }
            ++_data.array.last_index;
        }
        // We've got an intact reply. reset the index.
        _data.array.last_index = -1;
        return PARSE_OK;
    }

    // Notice that all branches returning PARSE_ERROR_NOT_ENOUGH_DATA must not change `buf'.
    const char* pfc = (const char*)buf.fetch1();
    if (pfc == NULL) {
        return PARSE_ERROR_NOT_ENOUGH_DATA;
    }
    const char fc = *pfc;  // first character
    switch (fc) {
    case '-':   // Error          "-<message>\r\n"
    case '+': { // Simple String  "+<string>\r\n"
        butil::IOBuf str;
        if (buf.cut_until(&str, "\r\n") != 0) {
            const size_t len = buf.size();
            if (len > std::numeric_limits<uint32_t>::max()) {
                LOG(ERROR) << "simple string is too long! max length=2^32-1,"
                              " actually=" << len;
                return PARSE_ERROR_ABSOLUTELY_WRONG;
            }
            return PARSE_ERROR_NOT_ENOUGH_DATA;
        }
        const size_t len = str.size() - 1;
        if (len < sizeof(_data.short_str)) {
            // SSO short strings, including empty string.
            _type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
            _length = len;
            str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/);
            return PARSE_OK;
        }
        char* d = (char*)_arena->allocate((len/8 + 1)*8);
        if (d == NULL) {
            LOG(FATAL) << "Fail to allocate string[" << len << "]";
            return PARSE_ERROR_ABSOLUTELY_WRONG;
        }
        CHECK_EQ(len, str.copy_to_cstr(d, (size_t)-1L, 1/*skip fc*/));
        _type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
        _length = len;
        _data.long_str = d;
        return PARSE_OK;
    }
    case '$':   // Bulk String   "$<length>\r\n<string>\r\n"
    case '*':   // Array         "*<size>\r\n<sub-reply1><sub-reply2>..."
    case ':': { // Integer       ":<integer>\r\n"
        char intbuf[32];  // enough for fc + 64-bit decimal + \r\n
        const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
        intbuf[ncopied] = '\0';
        const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
        if (crlf_pos == butil::StringPiece::npos) {  // not enough data
            return PARSE_ERROR_NOT_ENOUGH_DATA;
        }
        char* endptr = NULL;
        int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
        if (endptr != intbuf + crlf_pos) {
            LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
            return PARSE_ERROR_ABSOLUTELY_WRONG;
        }
        if (fc == ':') {
            buf.pop_front(crlf_pos + 2/*CRLF*/);
            _type = REDIS_REPLY_INTEGER;
            _length = 0;
            _data.integer = value;
            return PARSE_OK;
        } else if (fc == '$') {
            const int64_t len = value;  // `value' is length of the string
            if (len < 0) {  // redis nil
                buf.pop_front(crlf_pos + 2/*CRLF*/);
                _type = REDIS_REPLY_NIL;
                _length = 0;
                _data.integer = 0;
                return PARSE_OK;
            }
            if (len > (int64_t)std::numeric_limits<uint32_t>::max()) {
                LOG(ERROR) << "bulk string is too long! max length=2^32-1,"
                    " actually=" << len;
                return PARSE_ERROR_ABSOLUTELY_WRONG;
            }
            // We provide c_str(), thus even if bulk string is started with
            // length, we have to end it with \0.
            if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
                return PARSE_ERROR_NOT_ENOUGH_DATA;
            }
            if ((size_t)len < sizeof(_data.short_str)) {
                // SSO short strings, including empty string.
                _type = REDIS_REPLY_STRING;
                _length = len;
                buf.pop_front(crlf_pos + 2);
                buf.cutn(_data.short_str, len);
                _data.short_str[len] = '\0';
            } else {
                char* d = (char*)_arena->allocate((len/8 + 1)*8);
                if (d == NULL) {
                    LOG(FATAL) << "Fail to allocate string[" << len << "]";
                    return PARSE_ERROR_ABSOLUTELY_WRONG;
                }
                buf.pop_front(crlf_pos + 2/*CRLF*/);
                buf.cutn(d, len);
                d[len] = '\0';
                _type = REDIS_REPLY_STRING;
                _length = len;
                _data.long_str = d;
            }
            char crlf[2];
            buf.cutn(crlf, sizeof(crlf));
            if (crlf[0] != '\r' || crlf[1] != '\n') {
                LOG(ERROR) << "Bulk string is not ended with CRLF";
                return PARSE_ERROR_ABSOLUTELY_WRONG;
            }
            return PARSE_OK;
        } else {
            const int64_t count = value;  // `value' is count of sub replies
            if (count < 0) { // redis nil
                buf.pop_front(crlf_pos + 2/*CRLF*/);
                _type = REDIS_REPLY_NIL;
                _length = 0;
                _data.integer = 0;
                return PARSE_OK;
            }
            if (count == 0) { // empty array
                buf.pop_front(crlf_pos + 2/*CRLF*/);
                _type = REDIS_REPLY_ARRAY;
                _length = 0;
                _data.array.last_index = -1;
                _data.array.replies = NULL;
                return PARSE_OK;
            }
            if (count > (int64_t)std::numeric_limits<uint32_t>::max()) {
                LOG(ERROR) << "Too many sub replies! max count=2^32-1,"
                    " actually=" << count;
                return PARSE_ERROR_ABSOLUTELY_WRONG;
            }
            // FIXME(gejun): Call allocate_aligned instead.
            RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * count);
            if (subs == NULL) {
                LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]";
                return PARSE_ERROR_ABSOLUTELY_WRONG;
            }
            for (int64_t i = 0; i < count; ++i) {
                new (&subs[i]) RedisReply(_arena);
            }
            buf.pop_front(crlf_pos + 2/*CRLF*/);
            _type = REDIS_REPLY_ARRAY;
            _length = count;
            _data.array.replies = subs;

            // Recursively parse sub replies. If any of them fails, it will
            // be continued in next calls by tracking _data.array.last_index.
            _data.array.last_index = 0;
            for (int64_t i = 0; i < count; ++i) {
                ParseError err = subs[i].ConsumePartialIOBuf(buf);
                if (err != PARSE_OK) {
                    return err;
                }
                ++_data.array.last_index;
            }
            _data.array.last_index = -1;
            return PARSE_OK;
        }
    }
    default:
        LOG(ERROR) << "Invalid first character=" << (int)fc;
        return PARSE_ERROR_ABSOLUTELY_WRONG;
    }
    return PARSE_ERROR_ABSOLUTELY_WRONG;
}