in java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java [494:738]
public List<KuduScanToken> build() {
if (lowerBoundPartitionKey.length != 0 ||
upperBoundPartitionKey.length != 0) {
throw new IllegalArgumentException(
"Partition key bounds may not be set on KuduScanTokenBuilder");
}
// If the scan is short-circuitable, then return no tokens.
for (KuduPredicate predicate : predicates.values()) {
if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
return new ArrayList<>();
}
}
Client.ScanTokenPB.Builder proto = Client.ScanTokenPB.newBuilder();
if (includeTableMetadata) {
// Set the table metadata so that a call to the master is not needed when
// deserializing the token into a scanner.
Client.TableMetadataPB tableMetadataPB = Client.TableMetadataPB.newBuilder()
.setTableId(table.getTableId())
.setTableName(table.getName())
.setOwner(table.getOwner())
.setComment(table.getComment())
.setNumReplicas(table.getNumReplicas())
.setSchema(ProtobufHelper.schemaToPb(table.getSchema()))
.setPartitionSchema(ProtobufHelper.partitionSchemaToPb(table.getPartitionSchema()))
.putAllExtraConfigs(table.getExtraConfig())
.build();
proto.setTableMetadata(tableMetadataPB);
// Only include the authz token if the table metadata is included.
// It is returned in the required GetTableSchema request otherwise.
Token.SignedTokenPB authzToken = client.getAuthzToken(table.getTableId());
if (authzToken != null) {
proto.setAuthzToken(authzToken);
}
} else {
// If we add the table metadata, we don't need to set the old table id
// and table name. It is expected that the creation and use of a scan token
// will be on the same or compatible versions.
proto.setTableId(table.getTableId());
proto.setTableName(table.getName());
}
// Map the column names or indices to actual columns in the table schema.
// If the user did not set either projection, then scan all columns.
Schema schema = table.getSchema();
if (includeTableMetadata) {
// If the table metadata is included, then the column indexes can be
// used instead of duplicating the ColumnSchemaPBs in the serialized
// scan token.
if (projectedColumnNames != null) {
for (String columnName : projectedColumnNames) {
proto.addProjectedColumnIdx(schema.getColumnIndex(columnName));
}
} else if (projectedColumnIndexes != null) {
proto.addAllProjectedColumnIdx(projectedColumnIndexes);
} else {
List<Integer> indexes = IntStream.range(0, schema.getColumnCount())
.boxed().collect(Collectors.toList());
proto.addAllProjectedColumnIdx(indexes);
}
} else {
if (projectedColumnNames != null) {
for (String columnName : projectedColumnNames) {
ColumnSchema columnSchema = schema.getColumn(columnName);
Preconditions.checkArgument(columnSchema != null,
"unknown column i%s", columnName);
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
schema.hasColumnIds() ? schema.getColumnId(columnName) : -1,
columnSchema);
}
} else if (projectedColumnIndexes != null) {
for (int columnIdx : projectedColumnIndexes) {
ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx);
Preconditions.checkArgument(columnSchema != null,
"unknown column index %s", columnIdx);
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
schema.hasColumnIds() ?
schema.getColumnId(columnSchema.getName()) :
-1,
columnSchema);
}
} else {
for (ColumnSchema column : schema.getColumns()) {
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
schema.hasColumnIds() ?
schema.getColumnId(column.getName()) :
-1,
column);
}
}
}
for (KuduPredicate predicate : predicates.values()) {
proto.addColumnPredicates(predicate.toPB());
}
if (lowerBoundPrimaryKey.length > 0) {
proto.setLowerBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(lowerBoundPrimaryKey));
}
if (upperBoundPrimaryKey.length > 0) {
proto.setUpperBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(upperBoundPrimaryKey));
}
proto.setLimit(limit);
proto.setReadMode(readMode.pbVersion());
if (replicaSelection == ReplicaSelection.LEADER_ONLY) {
proto.setReplicaSelection(Common.ReplicaSelection.LEADER_ONLY);
} else if (replicaSelection == ReplicaSelection.CLOSEST_REPLICA) {
proto.setReplicaSelection(Common.ReplicaSelection.CLOSEST_REPLICA);
}
// If the last propagated timestamp is set send it with the scan.
if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
proto.setPropagatedTimestamp(client.getLastPropagatedTimestamp());
}
// If the mode is set to read on snapshot set the snapshot timestamps.
if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) {
if (htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
proto.setSnapTimestamp(htTimestamp);
}
if (startTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
proto.setSnapStartTimestamp(startTimestamp);
}
}
proto.setCacheBlocks(cacheBlocks);
proto.setFaultTolerant(isFaultTolerant);
proto.setBatchSizeBytes(batchSizeBytes);
proto.setScanRequestTimeoutMs(scanRequestTimeout);
proto.setKeepAlivePeriodMs(keepAlivePeriodMs);
try {
PartitionPruner pruner = PartitionPruner.create(this);
List<KeyRange> keyRanges = new ArrayList<>();
while (pruner.hasMorePartitionKeyRanges()) {
Pair<byte[], byte[]> partitionRange = pruner.nextPartitionKeyRange();
List<KeyRange> newKeyRanges = client.getTableKeyRanges(
table,
proto.getLowerBoundPrimaryKey().toByteArray(),
proto.getUpperBoundPrimaryKey().toByteArray(),
partitionRange.getFirst().length == 0 ? null : partitionRange.getFirst(),
partitionRange.getSecond().length == 0 ? null : partitionRange.getSecond(),
AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP,
splitSizeBytes,
timeout).join();
if (newKeyRanges.isEmpty()) {
pruner.removePartitionKeyRange(partitionRange.getSecond());
} else {
pruner.removePartitionKeyRange(newKeyRanges.get(newKeyRanges.size() - 1)
.getPartitionKeyEnd());
}
keyRanges.addAll(newKeyRanges);
}
List<KuduScanToken> tokens = new ArrayList<>(keyRanges.size());
for (KeyRange keyRange : keyRanges) {
Client.ScanTokenPB.Builder builder = proto.clone();
builder.setLowerBoundPartitionKey(
UnsafeByteOperations.unsafeWrap(keyRange.getPartitionKeyStart()));
builder.setUpperBoundPartitionKey(
UnsafeByteOperations.unsafeWrap(keyRange.getPartitionKeyEnd()));
byte[] primaryKeyStart = keyRange.getPrimaryKeyStart();
if (primaryKeyStart != null && primaryKeyStart.length > 0) {
builder.setLowerBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(primaryKeyStart));
}
byte[] primaryKeyEnd = keyRange.getPrimaryKeyEnd();
if (primaryKeyEnd != null && primaryKeyEnd.length > 0) {
builder.setUpperBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(primaryKeyEnd));
}
LocatedTablet tablet = keyRange.getTablet();
// Set the tablet metadata so that a call to the master is not needed to
// locate the tablet to scan when opening the scanner.
if (includeTabletMetadata) {
TableLocationsCache.Entry entry = client.getTableLocationEntry(table.getTableId(),
tablet.getPartition().partitionKeyStart);
if (entry != null && !entry.isNonCoveredRange() && !entry.isStale()) {
RemoteTablet remoteTablet = entry.getTablet();
// Build the list of server metadata.
List<Client.ServerMetadataPB> servers = new ArrayList<>();
Map<HostAndPort, Integer> serverIndexMap = new HashMap<>();
List<ServerInfo> tabletServers = remoteTablet.getTabletServersCopy();
for (int i = 0; i < tabletServers.size(); i++) {
ServerInfo serverInfo = tabletServers.get(i);
Client.ServerMetadataPB serverMetadataPB =
Client.ServerMetadataPB.newBuilder()
.setUuid(ByteString.copyFromUtf8(serverInfo.getUuid()))
.addRpcAddresses(
ProtobufHelper.hostAndPortToPB(serverInfo.getHostAndPort()))
.setLocation(serverInfo.getLocation())
.build();
servers.add(serverMetadataPB);
serverIndexMap.put(serverInfo.getHostAndPort(), i);
}
// Build the list of replica metadata.
List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>();
for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) {
Integer serverIndex = serverIndexMap.get(
new HostAndPort(replica.getRpcHost(), replica.getRpcPort()));
// If the server index is not found it means that RemoteTablet.removeTabletClient
// was called and removed the server likely as a result of a tablet not found error.
// In that case we should remove the replica as it can't be contacted.
if (serverIndex == null) {
continue;
}
Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder =
Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder()
.setRole(replica.getRoleAsEnum())
.setTsIdx(serverIndex);
if (replica.getDimensionLabel() != null) {
tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel());
}
replicas.add(tabletMetadataBuilder.build());
}
// Build the tablet metadata and add it to the token.
Client.TabletMetadataPB tabletMetadataPB = Client.TabletMetadataPB.newBuilder()
.setTabletId(remoteTablet.getTabletId())
.setPartition(ProtobufHelper.partitionToPb(remoteTablet.getPartition()))
.addAllReplicas(replicas)
.addAllTabletServers(servers)
.setTtlMillis(entry.ttl())
.build();
builder.setTabletMetadata(tabletMetadataPB);
}
}
builder.setQueryId(queryId);
tokens.add(new KuduScanToken(keyRange.getTablet(), builder.build()));
}
return tokens;
} catch (Exception e) {
throw new RuntimeException(e);
}
}