in src/kudu/client/scan_token-internal.cc [343:623]
Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
KuduTable* table = configuration_.table_;
KuduClient* client = table->client();
configuration_.OptimizeScanSpec();
if (configuration_.spec().CanShortCircuit()) {
return Status::OK();
}
ScanTokenPB pb;
if (include_table_metadata_) {
// Set the table metadata so that a call to the master is not needed when
// deserializing the token into a scanner.
TableMetadataPB table_pb;
table_pb.set_table_id(table->id());
table_pb.set_table_name(table->name());
table_pb.set_num_replicas(table->num_replicas());
table_pb.set_owner(table->owner());
table_pb.set_comment(table->comment());
SchemaPB schema_pb;
RETURN_NOT_OK(SchemaToPB(KuduSchema::ToSchema(table->schema()), &schema_pb));
*table_pb.mutable_schema() = std::move(schema_pb);
PartitionSchemaPB partition_schema_pb;
RETURN_NOT_OK(table->partition_schema().ToPB(KuduSchema::ToSchema(table->schema()),
&partition_schema_pb));
table_pb.mutable_partition_schema()->CopyFrom(partition_schema_pb);
table_pb.mutable_extra_configs()->insert(table->extra_configs().begin(),
table->extra_configs().end());
*pb.mutable_table_metadata() = std::move(table_pb);
// Only include the authz token if the table metadata is included.
// It is returned in the required GetTableSchema request otherwise.
SignedTokenPB authz_token;
bool found_authz_token = client->data_->FetchCachedAuthzToken(table->id(), &authz_token);
if (found_authz_token) {
*pb.mutable_authz_token() = std::move(authz_token);
}
} else {
// If we add the table metadata, we don't need to set the old table id
// and table name. It is expected that the creation and use of a scan token
// will be on the same or compatible versions.
pb.set_table_id(table->id());
pb.set_table_name(table->name());
}
if (include_table_metadata_) {
for (const ColumnSchema& col : configuration_.projection()->columns()) {
int column_idx;
RETURN_NOT_OK(table->schema().schema_->FindColumn(col.name(), &column_idx));
pb.mutable_projected_column_idx()->Add(column_idx);
}
} else {
RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), pb.mutable_projected_columns(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
}
if (configuration_.spec().lower_bound_key()) {
pb.mutable_lower_bound_primary_key()->assign(
reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
configuration_.spec().lower_bound_key()->encoded_key().size());
} else {
pb.clear_lower_bound_primary_key();
}
if (configuration_.spec().exclusive_upper_bound_key()) {
pb.mutable_upper_bound_primary_key()->assign(reinterpret_cast<const char*>(
configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
} else {
pb.clear_upper_bound_primary_key();
}
for (const auto& predicate_pair : configuration_.spec().predicates()) {
ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
}
switch (configuration_.selection_) {
case KuduClient::ReplicaSelection::LEADER_ONLY:
pb.set_replica_selection(kudu::ReplicaSelection::LEADER_ONLY);
break;
case KuduClient::ReplicaSelection::CLOSEST_REPLICA:
pb.set_replica_selection(kudu::ReplicaSelection::CLOSEST_REPLICA);
break;
case KuduClient::ReplicaSelection::FIRST_REPLICA:
pb.set_replica_selection(kudu::ReplicaSelection::FIRST_REPLICA);
break;
default:
return Status::InvalidArgument("replica_selection is invalid.");
}
const KuduScanner::ReadMode read_mode = configuration_.read_mode();
switch (read_mode) {
case KuduScanner::READ_LATEST:
pb.set_read_mode(kudu::READ_LATEST);
if (configuration_.has_snapshot_timestamp()) {
return Status::InvalidArgument("Snapshot timestamp should only be configured "
"for READ_AT_SNAPSHOT scan mode.");
}
break;
case KuduScanner::READ_AT_SNAPSHOT:
pb.set_read_mode(kudu::READ_AT_SNAPSHOT);
if (configuration_.has_start_timestamp()) {
pb.set_snap_start_timestamp(configuration_.start_timestamp());
}
if (configuration_.has_snapshot_timestamp()) {
pb.set_snap_timestamp(configuration_.snapshot_timestamp());
}
break;
case KuduScanner::READ_YOUR_WRITES:
pb.set_read_mode(kudu::READ_YOUR_WRITES);
if (configuration_.has_snapshot_timestamp()) {
return Status::InvalidArgument("Snapshot timestamp should only be configured "
"for READ_AT_SNAPSHOT scan mode.");
}
break;
default:
LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
}
pb.set_cache_blocks(configuration_.spec().cache_blocks());
pb.set_fault_tolerant(configuration_.is_fault_tolerant());
pb.set_propagated_timestamp(client->GetLatestObservedTimestamp());
pb.set_scan_request_timeout_ms(configuration_.timeout().ToMilliseconds());
if (configuration_.has_batch_size_bytes()) {
pb.set_batch_size_bytes(configuration_.batch_size_bytes());
}
pb.set_query_id(query_id_);
PartitionPruner pruner;
vector<MetaCache::RangeWithRemoteTablet> range_tablets;
pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec());
while (pruner.HasMorePartitionKeyRanges()) {
PartitionKey key_range;
vector<MetaCache::RangeWithRemoteTablet> tmp_range_tablets;
const auto& partition_key = pruner.NextPartitionKey();
Status s = client->data_->meta_cache_->GetTableKeyRanges(
table,
partition_key,
MetaCache::LookupType::kLowerBound,
split_size_bytes_,
client->default_rpc_timeout(),
&tmp_range_tablets);
if (s.IsNotFound()) {
pruner.RemovePartitionKeyRange({});
continue;
}
RETURN_NOT_OK(s);
if (tmp_range_tablets.empty()) {
pruner.RemovePartitionKeyRange(partition_key);
} else {
// If split_size_bytes_ set to zero, we just do search in meta cache.
// Check if the meta cache returned a tablet covering a partition key range past
// what we asked for. This can happen if the requested partition key falls
// in a non-covered range. In this case we can potentially prune the tablet.
if (split_size_bytes_ == 0 &&
partition_key < tmp_range_tablets.back().remote_tablet->partition().begin() &&
pruner.ShouldPrune(tmp_range_tablets.back().remote_tablet->partition())) {
pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
continue;
}
for (auto& range_tablet : tmp_range_tablets) {
range_tablets.push_back(range_tablet);
}
pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
}
}
for (const auto& range_tablet : range_tablets) {
const auto& range = range_tablet.key_range;
const auto& tablet = range_tablet.remote_tablet;
vector<internal::RemoteReplica> replicas;
tablet->GetRemoteReplicas(&replicas);
vector<const KuduReplica*> client_replicas;
ElementDeleter deleter(&client_replicas);
// Convert the replicas from their internal format to something appropriate
// for clients.
for (const auto& r : replicas) {
vector<HostPort> host_ports;
r.ts->GetHostPorts(&host_ports);
if (host_ports.empty()) {
return Status::IllegalState(Substitute(
"No host found for tablet server $0", r.ts->ToString()));
}
unique_ptr<KuduTabletServer> client_ts(new KuduTabletServer);
client_ts->data_ = new KuduTabletServer::Data(r.ts->permanent_uuid(),
host_ports[0],
r.ts->location());
bool is_leader = r.role == consensus::RaftPeerPB::LEADER;
bool is_voter = is_leader || r.role == consensus::RaftPeerPB::FOLLOWER;
unique_ptr<KuduReplica> client_replica(new KuduReplica);
client_replica->data_ = new KuduReplica::Data(is_leader, is_voter,
std::move(client_ts));
client_replicas.push_back(client_replica.release());
}
unique_ptr<KuduTablet> client_tablet(new KuduTablet);
client_tablet->data_ = new KuduTablet::Data(tablet->tablet_id(),
std::move(client_replicas));
client_replicas.clear();
// Create the scan token itself.
ScanTokenPB message;
message.CopyFrom(pb);
message.set_lower_bound_partition_key(tablet->partition().begin().ToString());
message.set_upper_bound_partition_key(tablet->partition().end().ToString());
if (!range.start_primary_key().empty() && split_size_bytes_) {
message.clear_lower_bound_primary_key();
message.set_lower_bound_primary_key(range.start_primary_key());
}
if (!range.stop_primary_key().empty() && split_size_bytes_) {
message.clear_upper_bound_primary_key();
message.set_upper_bound_primary_key(range.stop_primary_key());
}
// Set the tablet metadata so that a call to the master is not needed to
// locate the tablet to scan when opening the scanner.
if (include_tablet_metadata_) {
internal::MetaCacheEntry entry;
if (client->data_->meta_cache_->LookupEntryByKeyFastPath(table,
tablet->partition().begin(), &entry)) {
if (!entry.is_non_covered_range() && !entry.stale()) {
TabletMetadataPB tablet_pb;
tablet_pb.set_tablet_id(entry.tablet()->tablet_id());
PartitionPB partition_pb;
entry.tablet()->partition().ToPB(&partition_pb);
*tablet_pb.mutable_partition() = std::move(partition_pb);
MonoDelta ttl = entry.expiration_time() - MonoTime::Now();
tablet_pb.set_ttl_millis(ttl.ToMilliseconds());
// Build the list of server metadata.
vector<RemoteTabletServer*> servers;
map<string, int> server_index_map;
entry.tablet()->GetRemoteTabletServers(&servers);
for (int i = 0; i < servers.size(); i++) {
RemoteTabletServer* server = servers[i];
ServerMetadataPB server_pb;
server_pb.set_uuid(server->permanent_uuid());
server_pb.set_location(server->location());
vector<HostPort> host_ports;
server->GetHostPorts(&host_ports);
for (const HostPort& host_port : host_ports) {
*server_pb.add_rpc_addresses() = HostPortToPB(host_port);
server_index_map[host_port.ToString()] = i;
}
*tablet_pb.add_tablet_servers() = std::move(server_pb);
}
// Build the list of replica metadata.
vector<RemoteReplica> replicas;
entry.tablet()->GetRemoteReplicas(&replicas);
for (const RemoteReplica& replica : replicas) {
vector<HostPort> host_ports;
replica.ts->GetHostPorts(&host_ports);
int server_index = server_index_map[host_ports[0].ToString()];
TabletMetadataPB::ReplicaMetadataPB replica_pb;
replica_pb.set_role(replica.role);
replica_pb.set_ts_idx(server_index);
*tablet_pb.add_replicas() = std::move(replica_pb);
}
*message.mutable_tablet_metadata() = std::move(tablet_pb);
}
}
}
unique_ptr<KuduScanToken> client_scan_token(new KuduScanToken);
client_scan_token->data_ =
new KuduScanToken::Data(table,
std::move(message),
std::move(client_tablet));
tokens->push_back(client_scan_token.release());
}
return Status::OK();
}