in src/kudu/client/scanner-internal.cc [455:671]
Status KuduScanner::Data::OpenTablet(const PartitionKey& partition_key,
const MonoTime& deadline,
set<string>* blacklist) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, 500, "opening tablet");
PrepareRequest(KuduScanner::Data::NEW);
next_req_.clear_scanner_id();
NewScanRequestPB* scan = next_req_.mutable_new_scan_request();
scan->set_row_format_flags(configuration_.row_format_flags());
const KuduScanner::ReadMode read_mode = configuration_.read_mode();
switch (read_mode) {
case KuduScanner::READ_LATEST:
scan->set_read_mode(kudu::READ_LATEST);
if (configuration_.has_snapshot_timestamp()) {
LOG(FATAL) << "Snapshot timestamp should only be configured "
"for READ_AT_SNAPSHOT scan mode.";
}
break;
case KuduScanner::READ_AT_SNAPSHOT:
scan->set_read_mode(kudu::READ_AT_SNAPSHOT);
if (configuration_.has_start_timestamp()) {
scan->set_snap_start_timestamp(configuration_.start_timestamp());
}
if (configuration_.has_snapshot_timestamp()) {
scan->set_snap_timestamp(configuration_.snapshot_timestamp());
}
break;
case KuduScanner::READ_YOUR_WRITES:
scan->set_read_mode(kudu::READ_YOUR_WRITES);
if (configuration_.has_snapshot_timestamp()) {
LOG(FATAL) << "Snapshot timestamp should only be configured "
"for READ_AT_SNAPSHOT scan mode.";
}
break;
default:
LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
}
if (configuration_.is_fault_tolerant()) {
scan->set_order_mode(kudu::ORDERED);
} else {
scan->set_order_mode(kudu::UNORDERED);
}
if (last_primary_key_.length() > 0) {
VLOG(2) << "Setting NewScanRequestPB last_primary_key to hex value "
<< HexDump(last_primary_key_);
scan->set_last_primary_key(last_primary_key_);
}
if (configuration_.spec().has_limit()) {
// Set the limit based on the number of rows we've already returned.
int64_t new_limit = std::max(configuration_.spec().limit() - num_rows_returned_,
static_cast<int64_t>(0));
VLOG(2) << "Setting NewScanRequestPB limit " << new_limit;
scan->set_limit(new_limit);
}
scan->set_cache_blocks(configuration_.spec().cache_blocks());
// For consistent operations, propagate the timestamp among all operations
// performed the context of the same client. For READ_YOUR_WRITES scan, use
// the propagation timestamp from the scan config.
uint64_t ts = KuduClient::kNoTimestamp;
if (read_mode == KuduScanner::READ_YOUR_WRITES) {
if (configuration_.has_lower_bound_propagation_timestamp()) {
ts = configuration_.lower_bound_propagation_timestamp();
}
} else {
ts = table_->client()->data_->GetLatestObservedTimestamp();
}
if (ts != KuduClient::kNoTimestamp) {
scan->set_propagated_timestamp(ts);
}
// Set up the predicates.
scan->clear_column_predicates();
for (const auto& col_pred : configuration_.spec().predicates()) {
ColumnPredicateToPB(col_pred.second, scan->add_column_predicates());
}
if (configuration_.spec().lower_bound_key()) {
scan->mutable_start_primary_key()->assign(
reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
configuration_.spec().lower_bound_key()->encoded_key().size());
} else {
scan->clear_start_primary_key();
}
if (configuration_.spec().exclusive_upper_bound_key()) {
scan->mutable_stop_primary_key()->assign(reinterpret_cast<const char*>(
configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
} else {
scan->clear_stop_primary_key();
}
RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), scan->mutable_projected_columns(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
for (int attempt = 1;; attempt++) {
Synchronizer sync;
table_->client()->data_->meta_cache_->LookupTabletByKey(
table_.get(),
partition_key,
deadline,
internal::MetaCache::LookupType::kLowerBound,
&remote_,
sync.AsStatusCallback());
Status s = sync.Wait();
if (s.IsNotFound()) {
// No more tablets in the table.
partition_pruner_.RemovePartitionKeyRange({});
DCHECK(!partition_pruner_.HasMorePartitionKeyRanges());
return Status::OK();
}
RETURN_NOT_OK(s);
// Check if the meta cache returned a tablet covering a partition key range past
// what we asked for. This can happen if the requested partition key falls
// in a non-covered range. In this case we can potentially prune the tablet.
if (partition_key < remote_->partition().begin() &&
partition_pruner_.ShouldPrune(remote_->partition())) {
partition_pruner_.RemovePartitionKeyRange(remote_->partition().end());
return Status::OK();
}
scan->set_tablet_id(remote_->tablet_id());
RemoteTabletServer *ts;
vector<RemoteTabletServer*> candidates;
Status lookup_status = table_->client()->data_->GetTabletServer(
table_->client(),
remote_,
configuration_.selection(),
*blacklist,
&candidates,
&ts);
// If we get ServiceUnavailable, this indicates that the tablet doesn't
// currently have any known leader. We should sleep and retry, since
// it's likely that the tablet is undergoing a leader election and will
// soon have one.
if (lookup_status.IsServiceUnavailable() && MonoTime::Now() < deadline) {
// ServiceUnavailable means that we have already blacklisted all the candidate
// tablet servers. So, we clear the list so that we will cycle through them all
// another time.
blacklist->clear();
int sleep_ms = attempt * 100;
// TODO: should ensure that sleep_ms does not pass the provided deadline.
VLOG(1) << "Tablet " << remote_->tablet_id() << " currently unavailable: "
<< lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms "
<< "and retrying...";
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
continue;
}
RETURN_NOT_OK(lookup_status);
CHECK(ts->proxy());
ts_ = CHECK_NOTNULL(ts);
proxy_ = ts_->proxy();
bool allow_time_for_failover = candidates.size() > blacklist->size() + 1;
ScanRpcStatus scan_status = SendScanRpc(deadline, allow_time_for_failover);
if (scan_status.result == ScanRpcStatus::OK) {
last_error_ = Status::OK();
scan_attempts_ = 0;
break;
}
scan_attempts_++;
RETURN_NOT_OK(HandleError(scan_status, deadline, blacklist, /* needs_reopen=*/ nullptr));
}
partition_pruner_.RemovePartitionKeyRange(remote_->partition().end());
next_req_.clear_new_scan_request();
data_in_open_ = (last_response_.has_data() && last_response_.data().num_rows() > 0) ||
(last_response_.has_columnar_data() && last_response_.columnar_data().num_rows() > 0);
if (last_response_.has_more_results()) {
next_req_.set_scanner_id(last_response_.scanner_id());
VLOG(2) << "Opened tablet " << remote_->tablet_id()
<< ", scanner ID " << last_response_.scanner_id();
} else if (last_response_.has_data() || last_response_.has_columnar_data()) {
VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned, "
<< " data_in_open=" << data_in_open_;
} else {
VLOG(2) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned";
}
// If present in the response, set the snapshot timestamp and the encoded last
// primary key. This is used when retrying the scan elsewhere. The last
// primary key is also updated on each scan response.
if (configuration().is_fault_tolerant()) {
if (last_response_.has_last_primary_key()) {
last_primary_key_ = last_response_.last_primary_key();
}
}
if (configuration_.read_mode() == KuduScanner::READ_AT_SNAPSHOT &&
!configuration_.has_snapshot_timestamp()) {
// There must be a snapshot timestamp returned by the tablet server:
// it's the first response from the tablet server when scanning in the
// READ_AT_SNAPSHOT mode with unspecified snapshot timestamp.
CHECK(last_response_.has_snap_timestamp());
configuration_.SetSnapshotRaw(last_response_.snap_timestamp());
}
// For READ_YOUR_WRITES mode, updates the latest observed timestamp with
// the chosen snapshot timestamp sent back from the server, to avoid
// unnecessarily wait for subsequent reads.
if (configuration_.read_mode() == KuduScanner::READ_YOUR_WRITES) {
CHECK(last_response_.has_snap_timestamp());
table_->client()->data_->UpdateLatestObservedTimestamp(
last_response_.snap_timestamp());
} else if (last_response_.has_propagated_timestamp()) {
table_->client()->data_->UpdateLatestObservedTimestamp(
last_response_.propagated_timestamp());
}
return Status::OK();
}