void AsyncScanRpcRetryingCaller::OnComplete()

in src/hbase/client/async-scan-rpc-retrying-caller.cc [168:223]


void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
                                            std::shared_ptr<pb::ScanResponse> resp,
                                            const std::shared_ptr<CellScanner> cell_scanner) {
  VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;

  if (controller->Failed()) {
    OnError(controller->exception());
    return;
  }

  bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();

  int64_t num_complete_rows_before = results_cache_->num_complete_rows();
  try {
    auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);

    auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);

    auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());

    if (results.size() > 0) {
      UpdateNextStartRowWhenError(*results[results.size() - 1]);
      VLOG(5) << "Calling consumer->OnNext()";
      consumer_->OnNext(results, scan_controller);
    } else if (is_heartbeat) {
      consumer_->OnHeartbeat(scan_controller);
    }

    ScanControllerState state = scan_controller->Destroy();
    if (state == ScanControllerState::kTerminated) {
      if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
        // we have more results in region but user request to stop the scan, so we need to close the
        // scanner explicitly.
        CloseScanner();
      }
      CompleteNoMoreResults();
      return;
    }

    int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
    if (state == ScanControllerState::kSuspended) {
      if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
        return;
      }
    }
  } catch (const std::runtime_error& e) {
    // We can not retry here. The server has responded normally and the call sequence has been
    // increased so a new scan with the same call sequence will cause an
    // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
    LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
    CompleteWhenError(true);
    return;
  }

  CompleteOrNext(resp);
}