Status KuduScanToken::Data::PBIntoScanner()

in src/kudu/client/scan_token-internal.cc [112:337]


Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
                                          const ScanTokenPB& message,
                                          KuduScanner** scanner) {
  for (int32_t feature : message.feature_flags()) {
    if (!ScanTokenPB::Feature_IsValid(feature) || feature == ScanTokenPB::Unknown) {
      return Status::NotSupported(
          "scan token requires features not supported by this client version");
    }
  }

  // Use the table metadata from the scan token if it exists,
  // otherwise call OpenTable to get the metadata from the master.
  sp::shared_ptr<KuduTable> table;
  if (message.has_table_metadata()) {
    const TableMetadataPB& metadata = message.table_metadata();
    Schema schema;
    RETURN_NOT_OK(SchemaFromPB(metadata.schema(), &schema));
    KuduSchema kudu_schema(schema);
    PartitionSchema partition_schema;
    RETURN_NOT_OK(PartitionSchema::FromPB(metadata.partition_schema(), schema,
                                          &partition_schema));
    map<string, string> extra_configs(metadata.extra_configs().begin(),
        metadata.extra_configs().end());
    table.reset(new KuduTable(client->shared_from_this(), metadata.table_name(),
        metadata.table_id(), metadata.num_replicas(), metadata.owner(), metadata.comment(),
        kudu_schema, partition_schema, extra_configs));
  } else {
    TableIdentifierPB table_identifier;
    if (message.has_table_id()) {
      table_identifier.set_table_id(message.table_id());
    }
    if (message.has_table_name()) {
      table_identifier.set_table_name(message.table_name());
    }
    RETURN_NOT_OK(client->data_->OpenTable(client, table_identifier, &table));
  }

  // Prime the client tablet location cache if no entry is already present.
  if (message.has_tablet_metadata()) {
    const TabletMetadataPB& tablet_metadata = message.tablet_metadata();
    Partition partition;
    Partition::FromPB(tablet_metadata.partition(), &partition);
    MetaCacheEntry entry;
    if (!client->data_->meta_cache_->LookupEntryByKeyFastPath(table.get(),
        partition.begin(), &entry)) {
      // Generate a fake GetTableLocationsResponsePB to pass to the client
      // meta cache in order to "inject" the tablet metadata into the client.
      GetTableLocationsResponsePB mock_resp;
      mock_resp.set_ttl_millis(tablet_metadata.ttl_millis());

      // Populate the locations.
      TabletLocationsPB locations_pb;
      locations_pb.set_tablet_id(tablet_metadata.tablet_id());
      PartitionPB partition_pb;
      partition.ToPB(&partition_pb);
      *locations_pb.mutable_partition() = std::move(partition_pb);
      for (const TabletMetadataPB::ReplicaMetadataPB& replica_meta : tablet_metadata.replicas()) {
        TabletLocationsPB::InternedReplicaPB replica_pb;
        replica_pb.set_ts_info_idx(replica_meta.ts_idx());
        replica_pb.set_role(replica_meta.role());
        if (replica_meta.has_dimension_label()) {
          replica_pb.set_dimension_label(replica_meta.dimension_label());
        }
        *locations_pb.add_interned_replicas() = std::move(replica_pb);
      }
      *mock_resp.add_tablet_locations() = std::move(locations_pb);

      // Populate the servers.
      for (const ServerMetadataPB& server_meta : tablet_metadata.tablet_servers()) {
        TSInfoPB server_pb;
        server_pb.set_permanent_uuid(server_meta.uuid());
        server_pb.set_location(server_meta.location());
        for (const HostPortPB& host_port :server_meta.rpc_addresses()) {
          *server_pb.add_rpc_addresses() = host_port;
        }
        *mock_resp.add_ts_infos() = std::move(server_pb);
      }

      RETURN_NOT_OK(client->data_->meta_cache_->ProcessGetTableLocationsResponse(
          table.get(), partition.begin(), true, mock_resp, &entry, 1));
    }
  }

  if (message.has_authz_token()) {
    client->data_->StoreAuthzToken(table->id(), message.authz_token());
  }

  Schema* schema = table->schema().schema_;

  unique_ptr<KuduScanner> scan_builder(new KuduScanner(table.get()));

  // TODO: for statements like "create table as select *", auto_incrementing_id has to be
  // omitted from projected columns. We could provide API to toggle the presence of
  // the auto incrementing column.
  vector<int> column_indexes;
  if (!message.projected_column_idx().empty()) {
    for (const int column_idx : message.projected_column_idx()) {
      column_indexes.push_back(column_idx);
    }
  } else {
    for (const ColumnSchemaPB& column : message.projected_columns()) {
      int column_idx = schema->find_column(column.name());
      if (column_idx == Schema::kColumnNotFound) {
        return Status::IllegalState("unknown column in scan token", column.name());
      }
      DataType expected_type = schema->column(column_idx).type_info()->type();
      if (column.type() != expected_type) {
        return Status::IllegalState(Substitute(
            "invalid type $0 for column '$1' in scan token, expected: $2",
            DataType_Name(column.type()), column.name(), DataType_Name(expected_type)));
      }
      bool expected_is_nullable = schema->column(column_idx).is_nullable();
      if (column.is_nullable() != expected_is_nullable) {
        return Status::IllegalState(Substitute(
            "invalid nullability for column '$0' in scan token, expected: $1",
            column.name(), expected_is_nullable ? "NULLABLE" : "NOT NULL"));
      }
      column_indexes.push_back(column_idx);
    }
  }
  RETURN_NOT_OK(scan_builder->SetProjectedColumnIndexes(column_indexes));

  ScanConfiguration* configuration = scan_builder->data_->mutable_configuration();
  for (const ColumnPredicatePB& pb : message.column_predicates()) {
    std::optional<ColumnPredicate> predicate;
    RETURN_NOT_OK(ColumnPredicateFromPB(*schema, configuration->arena(), pb, &predicate));
    configuration->AddConjunctPredicate(std::move(*predicate));
  }

  switch (message.replica_selection()) {
    case kudu::ReplicaSelection::LEADER_ONLY:
      RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY),
                        ERROR, "set replica selection LEADER_ONLY failed");
      break;
    case kudu::ReplicaSelection::CLOSEST_REPLICA:
      RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA),
                        ERROR, "set replica selection CLOSEST_REPLICA failed");
      break;
    case kudu::ReplicaSelection::FIRST_REPLICA:
      RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::FIRST_REPLICA),
                        ERROR, "set replica selection FIRST_REPLICA failed");
      break;
    default:
      return Status::NotSupported("unsupported ReplicaSelection policy");
  }

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
  if (message.has_lower_bound_primary_key()) {
    RETURN_NOT_OK(scan_builder->AddLowerBoundRaw(message.lower_bound_primary_key()));
  }
  if (message.has_upper_bound_primary_key()) {
    RETURN_NOT_OK(scan_builder->AddExclusiveUpperBoundRaw(message.upper_bound_primary_key()));
  }
#pragma GCC diagnostic pop

  if (message.has_lower_bound_partition_key()) {
    RETURN_NOT_OK(scan_builder->AddLowerBoundPartitionKeyRaw(message.lower_bound_partition_key()));
  }
  if (message.has_upper_bound_partition_key()) {
    RETURN_NOT_OK(scan_builder->AddExclusiveUpperBoundPartitionKeyRaw(
          message.upper_bound_partition_key()));
  }

  if (message.has_limit()) {
    // TODO(KUDU-16)
  }

  if (message.has_read_mode()) {
    switch (message.read_mode()) {
      case ReadMode::READ_LATEST:
        RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_LATEST));
        break;
      case ReadMode::READ_AT_SNAPSHOT:
        RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
        break;
      case ReadMode::READ_YOUR_WRITES:
        RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_YOUR_WRITES));
        break;
      default:
        return Status::InvalidArgument("scan token has unrecognized read mode");
    }
  }

  if (message.fault_tolerant()) {
    RETURN_NOT_OK(scan_builder->SetFaultTolerant());
  }

  if (message.has_snap_start_timestamp() && message.has_snap_timestamp()) {
    RETURN_NOT_OK(scan_builder->SetDiffScan(message.snap_start_timestamp(),
                                            message.snap_timestamp()));
  } else if (message.has_snap_timestamp()) {
    RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp()));
  }

  RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));

  // Since the latest observed timestamp from the given client might be
  // more recent than the one when the token is created, the performance
  // of the scan could be affected if using READ_YOUR_WRITES mode.
  //
  // We choose to keep it this way because the code path is simpler.
  // Beside, in practice it's very rarely the case that an active client
  // is permanently being written to and read from (using scan tokens).
  //
  // However it is worth to note that this is a possible optimization, if
  // we ever notice READ_YOUR_WRITES read stalling with scan tokens.
  if (message.has_propagated_timestamp()) {
    client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp());
  }

  if (message.has_batch_size_bytes()) {
    RETURN_NOT_OK(scan_builder->SetBatchSizeBytes(message.batch_size_bytes()));
  }

  if (message.has_scan_request_timeout_ms()) {
    RETURN_NOT_OK(scan_builder->SetTimeoutMillis(message.scan_request_timeout_ms()));
  }

  if (message.has_query_id() && !message.query_id().empty()) {
    RETURN_NOT_OK(scan_builder->SetQueryId(message.query_id()));
  }

  *scanner = scan_builder.release();
  return Status::OK();
}