in src/cluster/slot_migrate.cc [566:668]
Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) {
if (sock_fd < 0 || total <= 0) {
return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, count={}", sock_fd, total)};
}
// Set socket receive timeout first
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
// Start checking response
size_t bulk_or_array_len = 0;
int cnt = 0;
parser_state_ = ParserState::ArrayLen;
UniqueEvbuf evbuf;
while (true) {
// Read response data from socket buffer to the event buffer
if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
return {Status::NotOK, fmt::format("failed to read response: {}", strerror(errno))};
}
// Parse response data in event buffer
bool run = true;
while (run) {
switch (parser_state_) {
// Handle single string response
case ParserState::ArrayLen: {
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
info("[migrate] Event buffer is empty, read socket again");
run = false;
break;
}
if (line[0] == '-') {
return {Status::NotOK, fmt::format("got invalid response of length {}: {}", line.length, line.get())};
} else if (line[0] == '$' || line[0] == '*') {
auto parse_result = ParseInt<uint64_t>(std::string(line.get() + 1, line.length - 1), 10);
if (!parse_result) {
return {Status::NotOK, "protocol error: expected integer value"};
}
bulk_or_array_len = *parse_result;
if (bulk_or_array_len <= 0) {
parser_state_ = ParserState::OneRspEnd;
} else if (line[0] == '$') {
parser_state_ = ParserState::BulkData;
} else {
parser_state_ = ParserState::ArrayData;
}
} else if (line[0] == '+' || line[0] == ':') {
parser_state_ = ParserState::OneRspEnd;
} else {
return {Status::NotOK, fmt::format("got unexpected response of length {}: {}", line.length, line.get())};
}
break;
}
// Handle bulk string response
case ParserState::BulkData: {
if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
info("[migrate] Bulk data in event buffer is not complete, read socket again");
run = false;
break;
}
// TODO(chrisZMF): Check tail '\r\n'
evbuffer_drain(evbuf.get(), bulk_or_array_len + 2);
bulk_or_array_len = 0;
parser_state_ = ParserState::OneRspEnd;
break;
}
case ParserState::ArrayData: {
while (run && bulk_or_array_len > 0) {
evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, nullptr, EVBUFFER_EOL_CRLF_STRICT);
if (ptr.pos < 0) {
info("[migrate] Array data in event buffer is not complete, read socket again");
run = false;
break;
}
evbuffer_drain(evbuf.get(), ptr.pos + 2);
--bulk_or_array_len;
}
if (run) {
parser_state_ = ParserState::OneRspEnd;
}
break;
}
case ParserState::OneRspEnd: {
cnt++;
if (cnt >= total) {
return Status::OK();
}
parser_state_ = ParserState::ArrayLen;
break;
}
default:
break;
}
}
}
}