in src/kudu/client/scan_token-internal.cc [112:337]
Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
const ScanTokenPB& message,
KuduScanner** scanner) {
for (int32_t feature : message.feature_flags()) {
if (!ScanTokenPB::Feature_IsValid(feature) || feature == ScanTokenPB::Unknown) {
return Status::NotSupported(
"scan token requires features not supported by this client version");
}
}
// Use the table metadata from the scan token if it exists,
// otherwise call OpenTable to get the metadata from the master.
sp::shared_ptr<KuduTable> table;
if (message.has_table_metadata()) {
const TableMetadataPB& metadata = message.table_metadata();
Schema schema;
RETURN_NOT_OK(SchemaFromPB(metadata.schema(), &schema));
KuduSchema kudu_schema(schema);
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(metadata.partition_schema(), schema,
&partition_schema));
map<string, string> extra_configs(metadata.extra_configs().begin(),
metadata.extra_configs().end());
table.reset(new KuduTable(client->shared_from_this(), metadata.table_name(),
metadata.table_id(), metadata.num_replicas(), metadata.owner(), metadata.comment(),
kudu_schema, partition_schema, extra_configs));
} else {
TableIdentifierPB table_identifier;
if (message.has_table_id()) {
table_identifier.set_table_id(message.table_id());
}
if (message.has_table_name()) {
table_identifier.set_table_name(message.table_name());
}
RETURN_NOT_OK(client->data_->OpenTable(client, table_identifier, &table));
}
// Prime the client tablet location cache if no entry is already present.
if (message.has_tablet_metadata()) {
const TabletMetadataPB& tablet_metadata = message.tablet_metadata();
Partition partition;
Partition::FromPB(tablet_metadata.partition(), &partition);
MetaCacheEntry entry;
if (!client->data_->meta_cache_->LookupEntryByKeyFastPath(table.get(),
partition.begin(), &entry)) {
// Generate a fake GetTableLocationsResponsePB to pass to the client
// meta cache in order to "inject" the tablet metadata into the client.
GetTableLocationsResponsePB mock_resp;
mock_resp.set_ttl_millis(tablet_metadata.ttl_millis());
// Populate the locations.
TabletLocationsPB locations_pb;
locations_pb.set_tablet_id(tablet_metadata.tablet_id());
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
*locations_pb.mutable_partition() = std::move(partition_pb);
for (const TabletMetadataPB::ReplicaMetadataPB& replica_meta : tablet_metadata.replicas()) {
TabletLocationsPB::InternedReplicaPB replica_pb;
replica_pb.set_ts_info_idx(replica_meta.ts_idx());
replica_pb.set_role(replica_meta.role());
if (replica_meta.has_dimension_label()) {
replica_pb.set_dimension_label(replica_meta.dimension_label());
}
*locations_pb.add_interned_replicas() = std::move(replica_pb);
}
*mock_resp.add_tablet_locations() = std::move(locations_pb);
// Populate the servers.
for (const ServerMetadataPB& server_meta : tablet_metadata.tablet_servers()) {
TSInfoPB server_pb;
server_pb.set_permanent_uuid(server_meta.uuid());
server_pb.set_location(server_meta.location());
for (const HostPortPB& host_port :server_meta.rpc_addresses()) {
*server_pb.add_rpc_addresses() = host_port;
}
*mock_resp.add_ts_infos() = std::move(server_pb);
}
RETURN_NOT_OK(client->data_->meta_cache_->ProcessGetTableLocationsResponse(
table.get(), partition.begin(), true, mock_resp, &entry, 1));
}
}
if (message.has_authz_token()) {
client->data_->StoreAuthzToken(table->id(), message.authz_token());
}
Schema* schema = table->schema().schema_;
unique_ptr<KuduScanner> scan_builder(new KuduScanner(table.get()));
// TODO: for statements like "create table as select *", auto_incrementing_id has to be
// omitted from projected columns. We could provide API to toggle the presence of
// the auto incrementing column.
vector<int> column_indexes;
if (!message.projected_column_idx().empty()) {
for (const int column_idx : message.projected_column_idx()) {
column_indexes.push_back(column_idx);
}
} else {
for (const ColumnSchemaPB& column : message.projected_columns()) {
int column_idx = schema->find_column(column.name());
if (column_idx == Schema::kColumnNotFound) {
return Status::IllegalState("unknown column in scan token", column.name());
}
DataType expected_type = schema->column(column_idx).type_info()->type();
if (column.type() != expected_type) {
return Status::IllegalState(Substitute(
"invalid type $0 for column '$1' in scan token, expected: $2",
DataType_Name(column.type()), column.name(), DataType_Name(expected_type)));
}
bool expected_is_nullable = schema->column(column_idx).is_nullable();
if (column.is_nullable() != expected_is_nullable) {
return Status::IllegalState(Substitute(
"invalid nullability for column '$0' in scan token, expected: $1",
column.name(), expected_is_nullable ? "NULLABLE" : "NOT NULL"));
}
column_indexes.push_back(column_idx);
}
}
RETURN_NOT_OK(scan_builder->SetProjectedColumnIndexes(column_indexes));
ScanConfiguration* configuration = scan_builder->data_->mutable_configuration();
for (const ColumnPredicatePB& pb : message.column_predicates()) {
std::optional<ColumnPredicate> predicate;
RETURN_NOT_OK(ColumnPredicateFromPB(*schema, configuration->arena(), pb, &predicate));
configuration->AddConjunctPredicate(std::move(*predicate));
}
switch (message.replica_selection()) {
case kudu::ReplicaSelection::LEADER_ONLY:
RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY),
ERROR, "set replica selection LEADER_ONLY failed");
break;
case kudu::ReplicaSelection::CLOSEST_REPLICA:
RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA),
ERROR, "set replica selection CLOSEST_REPLICA failed");
break;
case kudu::ReplicaSelection::FIRST_REPLICA:
RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::FIRST_REPLICA),
ERROR, "set replica selection FIRST_REPLICA failed");
break;
default:
return Status::NotSupported("unsupported ReplicaSelection policy");
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
if (message.has_lower_bound_primary_key()) {
RETURN_NOT_OK(scan_builder->AddLowerBoundRaw(message.lower_bound_primary_key()));
}
if (message.has_upper_bound_primary_key()) {
RETURN_NOT_OK(scan_builder->AddExclusiveUpperBoundRaw(message.upper_bound_primary_key()));
}
#pragma GCC diagnostic pop
if (message.has_lower_bound_partition_key()) {
RETURN_NOT_OK(scan_builder->AddLowerBoundPartitionKeyRaw(message.lower_bound_partition_key()));
}
if (message.has_upper_bound_partition_key()) {
RETURN_NOT_OK(scan_builder->AddExclusiveUpperBoundPartitionKeyRaw(
message.upper_bound_partition_key()));
}
if (message.has_limit()) {
// TODO(KUDU-16)
}
if (message.has_read_mode()) {
switch (message.read_mode()) {
case ReadMode::READ_LATEST:
RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_LATEST));
break;
case ReadMode::READ_AT_SNAPSHOT:
RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
break;
case ReadMode::READ_YOUR_WRITES:
RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_YOUR_WRITES));
break;
default:
return Status::InvalidArgument("scan token has unrecognized read mode");
}
}
if (message.fault_tolerant()) {
RETURN_NOT_OK(scan_builder->SetFaultTolerant());
}
if (message.has_snap_start_timestamp() && message.has_snap_timestamp()) {
RETURN_NOT_OK(scan_builder->SetDiffScan(message.snap_start_timestamp(),
message.snap_timestamp()));
} else if (message.has_snap_timestamp()) {
RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp()));
}
RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
// Since the latest observed timestamp from the given client might be
// more recent than the one when the token is created, the performance
// of the scan could be affected if using READ_YOUR_WRITES mode.
//
// We choose to keep it this way because the code path is simpler.
// Beside, in practice it's very rarely the case that an active client
// is permanently being written to and read from (using scan tokens).
//
// However it is worth to note that this is a possible optimization, if
// we ever notice READ_YOUR_WRITES read stalling with scan tokens.
if (message.has_propagated_timestamp()) {
client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp());
}
if (message.has_batch_size_bytes()) {
RETURN_NOT_OK(scan_builder->SetBatchSizeBytes(message.batch_size_bytes()));
}
if (message.has_scan_request_timeout_ms()) {
RETURN_NOT_OK(scan_builder->SetTimeoutMillis(message.scan_request_timeout_ms()));
}
if (message.has_query_id() && !message.query_id().empty()) {
RETURN_NOT_OK(scan_builder->SetQueryId(message.query_id()));
}
*scanner = scan_builder.release();
return Status::OK();
}