Status KuduScanTokenBuilder::Data::Build()

in src/kudu/client/scan_token-internal.cc [343:623]


Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
  KuduTable* table = configuration_.table_;
  KuduClient* client = table->client();
  configuration_.OptimizeScanSpec();

  if (configuration_.spec().CanShortCircuit()) {
    return Status::OK();
  }

  ScanTokenPB pb;

  if (include_table_metadata_) {
    // Set the table metadata so that a call to the master is not needed when
    // deserializing the token into a scanner.
    TableMetadataPB table_pb;
    table_pb.set_table_id(table->id());
    table_pb.set_table_name(table->name());
    table_pb.set_num_replicas(table->num_replicas());
    table_pb.set_owner(table->owner());
    table_pb.set_comment(table->comment());
    SchemaPB schema_pb;
    RETURN_NOT_OK(SchemaToPB(KuduSchema::ToSchema(table->schema()), &schema_pb));
    *table_pb.mutable_schema() = std::move(schema_pb);
    PartitionSchemaPB partition_schema_pb;
    RETURN_NOT_OK(table->partition_schema().ToPB(KuduSchema::ToSchema(table->schema()),
                                                 &partition_schema_pb));
    table_pb.mutable_partition_schema()->CopyFrom(partition_schema_pb);
    table_pb.mutable_extra_configs()->insert(table->extra_configs().begin(),
                                             table->extra_configs().end());
    *pb.mutable_table_metadata() = std::move(table_pb);

    // Only include the authz token if the table metadata is included.
    // It is returned in the required GetTableSchema request otherwise.
    SignedTokenPB authz_token;
    bool found_authz_token = client->data_->FetchCachedAuthzToken(table->id(), &authz_token);
    if (found_authz_token) {
      *pb.mutable_authz_token() = std::move(authz_token);
    }
  } else {
    // If we add the table metadata, we don't need to set the old table id
    // and table name. It is expected that the creation and use of a scan token
    // will be on the same or compatible versions.
    pb.set_table_id(table->id());
    pb.set_table_name(table->name());
  }

  if (include_table_metadata_) {
    for (const ColumnSchema& col : configuration_.projection()->columns()) {
      int column_idx;
      RETURN_NOT_OK(table->schema().schema_->FindColumn(col.name(), &column_idx));
      pb.mutable_projected_column_idx()->Add(column_idx);
    }
  } else {
    RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), pb.mutable_projected_columns(),
        SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
  }

  if (configuration_.spec().lower_bound_key()) {
    pb.mutable_lower_bound_primary_key()->assign(
      reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
      configuration_.spec().lower_bound_key()->encoded_key().size());
  } else {
    pb.clear_lower_bound_primary_key();
  }
  if (configuration_.spec().exclusive_upper_bound_key()) {
    pb.mutable_upper_bound_primary_key()->assign(reinterpret_cast<const char*>(
          configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
      configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
  } else {
    pb.clear_upper_bound_primary_key();
  }

  for (const auto& predicate_pair : configuration_.spec().predicates()) {
    ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
  }

  switch (configuration_.selection_) {
    case KuduClient::ReplicaSelection::LEADER_ONLY:
      pb.set_replica_selection(kudu::ReplicaSelection::LEADER_ONLY);
      break;
    case KuduClient::ReplicaSelection::CLOSEST_REPLICA:
      pb.set_replica_selection(kudu::ReplicaSelection::CLOSEST_REPLICA);
      break;
    case KuduClient::ReplicaSelection::FIRST_REPLICA:
      pb.set_replica_selection(kudu::ReplicaSelection::FIRST_REPLICA);
      break;
    default:
      return Status::InvalidArgument("replica_selection is invalid.");
  }
  const KuduScanner::ReadMode read_mode = configuration_.read_mode();
  switch (read_mode) {
    case KuduScanner::READ_LATEST:
      pb.set_read_mode(kudu::READ_LATEST);
      if (configuration_.has_snapshot_timestamp()) {
        return Status::InvalidArgument("Snapshot timestamp should only be configured "
                                       "for READ_AT_SNAPSHOT scan mode.");
      }
      break;
    case KuduScanner::READ_AT_SNAPSHOT:
      pb.set_read_mode(kudu::READ_AT_SNAPSHOT);
      if (configuration_.has_start_timestamp()) {
        pb.set_snap_start_timestamp(configuration_.start_timestamp());
      }
      if (configuration_.has_snapshot_timestamp()) {
        pb.set_snap_timestamp(configuration_.snapshot_timestamp());
      }
      break;
    case KuduScanner::READ_YOUR_WRITES:
      pb.set_read_mode(kudu::READ_YOUR_WRITES);
      if (configuration_.has_snapshot_timestamp()) {
        return Status::InvalidArgument("Snapshot timestamp should only be configured "
                                       "for READ_AT_SNAPSHOT scan mode.");
      }
      break;
    default:
      LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
  }

  pb.set_cache_blocks(configuration_.spec().cache_blocks());
  pb.set_fault_tolerant(configuration_.is_fault_tolerant());
  pb.set_propagated_timestamp(client->GetLatestObservedTimestamp());
  pb.set_scan_request_timeout_ms(configuration_.timeout().ToMilliseconds());

  if (configuration_.has_batch_size_bytes()) {
    pb.set_batch_size_bytes(configuration_.batch_size_bytes());
  }

  pb.set_query_id(query_id_);
  PartitionPruner pruner;
  vector<MetaCache::RangeWithRemoteTablet> range_tablets;
  pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec());
  while (pruner.HasMorePartitionKeyRanges()) {
    PartitionKey key_range;
    vector<MetaCache::RangeWithRemoteTablet> tmp_range_tablets;
    const auto& partition_key = pruner.NextPartitionKey();
    Status s = client->data_->meta_cache_->GetTableKeyRanges(
        table,
        partition_key,
        MetaCache::LookupType::kLowerBound,
        split_size_bytes_,
        client->default_rpc_timeout(),
        &tmp_range_tablets);

    if (s.IsNotFound()) {
      pruner.RemovePartitionKeyRange({});
      continue;
    }
    RETURN_NOT_OK(s);

    if (tmp_range_tablets.empty()) {
      pruner.RemovePartitionKeyRange(partition_key);
    } else {
      // If split_size_bytes_ set to zero, we just do search in meta cache.
      // Check if the meta cache returned a tablet covering a partition key range past
      // what we asked for. This can happen if the requested partition key falls
      // in a non-covered range. In this case we can potentially prune the tablet.
      if (split_size_bytes_ == 0 &&
          partition_key < tmp_range_tablets.back().remote_tablet->partition().begin() &&
          pruner.ShouldPrune(tmp_range_tablets.back().remote_tablet->partition())) {
        pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
        continue;
      }
      for (auto& range_tablet : tmp_range_tablets) {
        range_tablets.push_back(range_tablet);
      }
      pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end());
    }
  }

  for (const auto& range_tablet : range_tablets) {
    const auto& range = range_tablet.key_range;
    const auto& tablet = range_tablet.remote_tablet;

    vector<internal::RemoteReplica> replicas;
    tablet->GetRemoteReplicas(&replicas);

    vector<const KuduReplica*> client_replicas;
    ElementDeleter deleter(&client_replicas);

    // Convert the replicas from their internal format to something appropriate
    // for clients.
    for (const auto& r : replicas) {
      vector<HostPort> host_ports;
      r.ts->GetHostPorts(&host_ports);
      if (host_ports.empty()) {
        return Status::IllegalState(Substitute(
            "No host found for tablet server $0", r.ts->ToString()));
      }
      unique_ptr<KuduTabletServer> client_ts(new KuduTabletServer);
      client_ts->data_ = new KuduTabletServer::Data(r.ts->permanent_uuid(),
                                                    host_ports[0],
                                                    r.ts->location());
      bool is_leader = r.role == consensus::RaftPeerPB::LEADER;
      bool is_voter = is_leader || r.role == consensus::RaftPeerPB::FOLLOWER;
      unique_ptr<KuduReplica> client_replica(new KuduReplica);
      client_replica->data_ = new KuduReplica::Data(is_leader, is_voter,
                                                    std::move(client_ts));
      client_replicas.push_back(client_replica.release());
    }

    unique_ptr<KuduTablet> client_tablet(new KuduTablet);
    client_tablet->data_ = new KuduTablet::Data(tablet->tablet_id(),
                                                std::move(client_replicas));
    client_replicas.clear();

    // Create the scan token itself.
    ScanTokenPB message;
    message.CopyFrom(pb);
    message.set_lower_bound_partition_key(tablet->partition().begin().ToString());
    message.set_upper_bound_partition_key(tablet->partition().end().ToString());
    if (!range.start_primary_key().empty() && split_size_bytes_) {
      message.clear_lower_bound_primary_key();
      message.set_lower_bound_primary_key(range.start_primary_key());
    }
    if (!range.stop_primary_key().empty() && split_size_bytes_) {
      message.clear_upper_bound_primary_key();
      message.set_upper_bound_primary_key(range.stop_primary_key());
    }


    // Set the tablet metadata so that a call to the master is not needed to
    // locate the tablet to scan when opening the scanner.
    if (include_tablet_metadata_) {
      internal::MetaCacheEntry entry;
      if (client->data_->meta_cache_->LookupEntryByKeyFastPath(table,
          tablet->partition().begin(), &entry)) {
        if (!entry.is_non_covered_range() && !entry.stale()) {
          TabletMetadataPB tablet_pb;
          tablet_pb.set_tablet_id(entry.tablet()->tablet_id());
          PartitionPB partition_pb;
          entry.tablet()->partition().ToPB(&partition_pb);
          *tablet_pb.mutable_partition() = std::move(partition_pb);
          MonoDelta ttl = entry.expiration_time() - MonoTime::Now();
          tablet_pb.set_ttl_millis(ttl.ToMilliseconds());

          // Build the list of server metadata.
          vector<RemoteTabletServer*> servers;
          map<string, int> server_index_map;
          entry.tablet()->GetRemoteTabletServers(&servers);
          for (int i = 0; i < servers.size(); i++) {
            RemoteTabletServer* server = servers[i];
            ServerMetadataPB server_pb;
            server_pb.set_uuid(server->permanent_uuid());
            server_pb.set_location(server->location());
            vector<HostPort> host_ports;
            server->GetHostPorts(&host_ports);
            for (const HostPort& host_port : host_ports) {
              *server_pb.add_rpc_addresses() = HostPortToPB(host_port);
              server_index_map[host_port.ToString()] = i;
            }
            *tablet_pb.add_tablet_servers() = std::move(server_pb);
          }

          // Build the list of replica metadata.
          vector<RemoteReplica> replicas;
          entry.tablet()->GetRemoteReplicas(&replicas);
          for (const RemoteReplica& replica : replicas) {
            vector<HostPort> host_ports;
            replica.ts->GetHostPorts(&host_ports);
            int server_index = server_index_map[host_ports[0].ToString()];
            TabletMetadataPB::ReplicaMetadataPB replica_pb;
            replica_pb.set_role(replica.role);
            replica_pb.set_ts_idx(server_index);
            *tablet_pb.add_replicas() = std::move(replica_pb);
          }

          *message.mutable_tablet_metadata() = std::move(tablet_pb);
        }
      }
    }

    unique_ptr<KuduScanToken> client_scan_token(new KuduScanToken);
    client_scan_token->data_ =
        new KuduScanToken::Data(table,
                                std::move(message),
                                std::move(client_tablet));
    tokens->push_back(client_scan_token.release());
  }

  return Status::OK();
}