in src/hbase/client/async-scan-rpc-retrying-caller.cc [168:223]
void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<pb::ScanResponse> resp,
const std::shared_ptr<CellScanner> cell_scanner) {
VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_;
if (controller->Failed()) {
OnError(controller->exception());
return;
}
bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message();
int64_t num_complete_rows_before = results_cache_->num_complete_rows();
try {
auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner);
auto results = results_cache_->AddAndGet(raw_results, is_heartbeat);
auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this());
if (results.size() > 0) {
UpdateNextStartRowWhenError(*results[results.size() - 1]);
VLOG(5) << "Calling consumer->OnNext()";
consumer_->OnNext(results, scan_controller);
} else if (is_heartbeat) {
consumer_->OnHeartbeat(scan_controller);
}
ScanControllerState state = scan_controller->Destroy();
if (state == ScanControllerState::kTerminated) {
if (resp->has_more_results_in_region() && !resp->more_results_in_region()) {
// we have more results in region but user request to stop the scan, so we need to close the
// scanner explicitly.
CloseScanner();
}
CompleteNoMoreResults();
return;
}
int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before;
if (state == ScanControllerState::kSuspended) {
if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) {
return;
}
}
} catch (const std::runtime_error& e) {
// We can not retry here. The server has responded normally and the call sequence has been
// increased so a new scan with the same call sequence will cause an
// OutOfOrderScannerNextException. Let the upper layer open a new scanner.
LOG(WARNING) << "Received exception in reading the scan response:" << e.what();
CompleteWhenError(true);
return;
}
CompleteOrNext(resp);
}