void ClientHandler::read()

in src/hbase/connection/client-handler.cc [48:116]


void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
  if (LIKELY(buf != nullptr)) {
    buf->coalesce();
    auto received = std::make_unique<Response>();
    pb::ResponseHeader header;

    int used_bytes = serde_.ParseDelimited(buf.get(), &header);
    VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
            << " has_exception=" << header.has_exception() << ", server: " << server_;

    auto resp_msg = resp_msgs_->find_and_erase(header.call_id());

    // set the call_id.
    // This will be used to by the dispatcher to match up
    // the promise with the response.
    received->set_call_id(header.call_id());

    // If there was an exception then there's no
    // data left on the wire.
    if (header.has_exception() == false) {
      buf->trimStart(used_bytes);

      int cell_block_length = 0;
      used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get());
      if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) {
        cell_block_length = header.cell_block_meta().length();
      }

      VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
              << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length
              << ", server: " << server_;

      // Make sure that bytes were parsed.
      CHECK((used_bytes + cell_block_length) == buf->length());

      if (cell_block_length > 0) {
        auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
        received->set_cell_scanner(std::shared_ptr<CellScanner>{cell_scanner.release()});
      }

      received->set_resp_msg(resp_msg);
    } else {
      hbase::pb::ExceptionResponse exceptionResponse = header.exception();

      std::string what;
      std::string exception_class_name = exceptionResponse.has_exception_class_name()
                                             ? exceptionResponse.exception_class_name()
                                             : "";
      std::string stack_trace =
          exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
      what.append(stack_trace);

      auto remote_exception = std::make_unique<RemoteException>(what);
      remote_exception->set_exception_class_name(exception_class_name)
          ->set_stack_trace(stack_trace)
          ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "")
          ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0);
      if (exceptionResponse.has_do_not_retry()) {
        remote_exception->set_do_not_retry(exceptionResponse.do_not_retry());
      }

      VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
              << " exception.what=" << remote_exception->what()
              << ", do_not_retry=" << remote_exception->do_not_retry() << ", server: " << server_;
      received->set_exception(folly::exception_wrapper{*remote_exception});
    }
    ctx->fireRead(std::move(received));
  }
}