in src/hbase/connection/client-handler.cc [48:116]
void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
if (LIKELY(buf != nullptr)) {
buf->coalesce();
auto received = std::make_unique<Response>();
pb::ResponseHeader header;
int used_bytes = serde_.ParseDelimited(buf.get(), &header);
VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
<< " has_exception=" << header.has_exception() << ", server: " << server_;
auto resp_msg = resp_msgs_->find_and_erase(header.call_id());
// set the call_id.
// This will be used to by the dispatcher to match up
// the promise with the response.
received->set_call_id(header.call_id());
// If there was an exception then there's no
// data left on the wire.
if (header.has_exception() == false) {
buf->trimStart(used_bytes);
int cell_block_length = 0;
used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get());
if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) {
cell_block_length = header.cell_block_meta().length();
}
VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
<< ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length
<< ", server: " << server_;
// Make sure that bytes were parsed.
CHECK((used_bytes + cell_block_length) == buf->length());
if (cell_block_length > 0) {
auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
received->set_cell_scanner(std::shared_ptr<CellScanner>{cell_scanner.release()});
}
received->set_resp_msg(resp_msg);
} else {
hbase::pb::ExceptionResponse exceptionResponse = header.exception();
std::string what;
std::string exception_class_name = exceptionResponse.has_exception_class_name()
? exceptionResponse.exception_class_name()
: "";
std::string stack_trace =
exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
what.append(stack_trace);
auto remote_exception = std::make_unique<RemoteException>(what);
remote_exception->set_exception_class_name(exception_class_name)
->set_stack_trace(stack_trace)
->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "")
->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0);
if (exceptionResponse.has_do_not_retry()) {
remote_exception->set_do_not_retry(exceptionResponse.do_not_retry());
}
VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
<< " exception.what=" << remote_exception->what()
<< ", do_not_retry=" << remote_exception->do_not_retry() << ", server: " << server_;
received->set_exception(folly::exception_wrapper{*remote_exception});
}
ctx->fireRead(std::move(received));
}
}