in java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java [73:282]
public static PartitionPruner create(AbstractKuduScannerBuilder<?, ?> scanner) {
Schema schema = scanner.table.getSchema();
final PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema();
Map<String, KuduPredicate> predicates = scanner.predicates;
// Check if the scan can be short-circuited entirely by checking the primary
// key bounds and predicates. This also allows us to assume some invariants of the
// scan, such as no None predicates and that the lower bound PK < upper
// bound PK.
if (scanner.upperBoundPrimaryKey.length > 0 &&
Bytes.memcmp(scanner.lowerBoundPrimaryKey, scanner.upperBoundPrimaryKey) >= 0) {
return PartitionPruner.empty();
}
for (KuduPredicate predicate : predicates.values()) {
if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
return PartitionPruner.empty();
}
}
// Build a set of partition key ranges which cover the tablets necessary for
// the scan.
//
// Example predicate sets and resulting partition key ranges, based on the
// following tablet schema:
//
// CREATE TABLE t (a INT32, b INT32, c INT32) PRIMARY KEY (a, b, c)
// DISTRIBUTE BY RANGE (c)
// HASH (a) INTO 2 BUCKETS
// HASH (b) INTO 3 BUCKETS;
//
// Assume that hash(0) = 0 and hash(2) = 2.
//
// | Predicates | Partition Key Ranges |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// | b = 2 | |
// | c = 0 | |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0, bucket=2), (bucket=0, bucket=3)) |
// | b = 2 | |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
// | c = 0 | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
// | | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// +------------+--------------------------------------------------------+
// | b = 2 | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// | c = 0 | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
// +------------+--------------------------------------------------------+
// | a = 0 | [(bucket=0), (bucket=1)) |
// +------------+--------------------------------------------------------+
// | b = 2 | [(bucket=0, bucket=2), (bucket=0, bucket=3)) |
// | | [(bucket=1, bucket=2), (bucket=1, bucket=3)) |
// +------------+--------------------------------------------------------+
// | c = 0 | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
// | | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
// | | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
// | | [(bucket=1, bucket=0, c=0), (bucket=1, bucket=0, c=1)) |
// | | [(bucket=1, bucket=1, c=0), (bucket=1, bucket=1, c=1)) |
// | | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
// +------------+--------------------------------------------------------+
// | None | [(), ()) |
//
// If the partition key is considered as a sequence of the hash bucket
// components and a range component, then a few patterns emerge from the
// examples above:
//
// 1) The partition keys are truncated after the final constrained component
// Hash bucket components are constrained when the scan is limited to a
// subset of buckets via equality or in-list predicates on that component.
// Range components are constrained if they have an upper or lower bound
// via range or equality predicates on that component.
//
// 2) If the final constrained component is a hash bucket, then the
// corresponding bucket in the upper bound is incremented in order to make
// it an exclusive key.
//
// 3) The number of partition key ranges in the result is equal to the product
// of the number of buckets of each unconstrained hash component which come
// before a final constrained component. If there are no unconstrained hash
// components, then the number of resulting partition key ranges is one. Note
// that this can be a lot of ranges, and we may find we need to limit the
// algorithm to give up on pruning if the number of ranges exceeds a limit.
// Until this becomes a problem in practice, we'll continue always pruning,
// since it is precisely these highly-hash-partitioned tables which get the
// most benefit from pruning.
// Step 1: Build the range portion of the partition key. If the range partition
// columns match the primary key columns, then we can substitute the primary
// key bounds, if they are tighter.
byte[] rangeLowerBound = pushPredsIntoLowerBoundRangeKey(schema, rangeSchema, predicates);
byte[] rangeUpperBound = pushPredsIntoUpperBoundRangeKey(schema, rangeSchema, predicates);
if (partitionSchema.isSimpleRangePartitioning()) {
if (Bytes.memcmp(rangeLowerBound, scanner.lowerBoundPrimaryKey) < 0) {
rangeLowerBound = scanner.lowerBoundPrimaryKey;
}
if (scanner.upperBoundPrimaryKey.length > 0 &&
(rangeUpperBound.length == 0 ||
Bytes.memcmp(rangeUpperBound, scanner.upperBoundPrimaryKey) > 0)) {
rangeUpperBound = scanner.upperBoundPrimaryKey;
}
}
// Since the table can contain range-specific hash schemas, it's necessary
// to split the original range into sub-ranges where each subrange comes
// with appropriate hash schema.
List<PartitionSchema.EncodedRangeBoundsWithHashSchema> preliminaryRanges =
splitIntoHashSpecificRanges(rangeLowerBound, rangeUpperBound, partitionSchema);
List<Pair<byte[], byte[]>> partitionKeyRangeBytes = new ArrayList<>();
for (PartitionSchema.EncodedRangeBoundsWithHashSchema preliminaryRange : preliminaryRanges) {
// Step 2: Create the hash bucket portion of the partition key.
final List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
preliminaryRange.hashSchemas;
// List of pruned hash buckets per hash component.
List<BitSet> hashComponents = new ArrayList<>(hashBucketSchemas.size());
for (PartitionSchema.HashBucketSchema hashSchema : hashBucketSchemas) {
hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
}
// The index of the final constrained component in the partition key.
int constrainedIndex = 0;
if (preliminaryRange.lower.length > 0 || preliminaryRange.upper.length > 0) {
// The range component is constrained if either of the range bounds are
// specified (non-empty).
constrainedIndex = hashBucketSchemas.size();
} else {
// Search the hash bucket constraints from right to left, looking for the
// first constrained component.
for (int i = hashComponents.size(); i > 0; i--) {
int numBuckets = hashBucketSchemas.get(i - 1).getNumBuckets();
BitSet hashBuckets = hashComponents.get(i - 1);
if (hashBuckets.nextClearBit(0) < numBuckets) {
constrainedIndex = i;
break;
}
}
}
// Build up a set of partition key ranges out of the hash components.
//
// Each hash component simply appends its bucket number to the
// partition key ranges (possibly incrementing the upper bound by one bucket
// number if this is the final constraint, see note 2 in the example above).
List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
// This is the final partition key component if this is the final constrained
// bucket, and the range upper bound is empty. In this case we need to
// increment the bucket on the upper bound to convert from inclusive to
// exclusive.
boolean isLast = hashIdx + 1 == constrainedIndex && preliminaryRange.upper.length == 0;
BitSet hashBuckets = hashComponents.get(hashIdx);
List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
new ArrayList<>(partitionKeyRanges.size() * hashBuckets.cardinality());
for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
for (int bucket = hashBuckets.nextSetBit(0);
bucket != -1;
bucket = hashBuckets.nextSetBit(bucket + 1)) {
int bucketUpper = isLast ? bucket + 1 : bucket;
ByteVec lower = partitionKeyRange.getFirst().clone();
ByteVec upper = partitionKeyRange.getFirst().clone();
KeyEncoder.encodeHashBucket(bucket, lower);
KeyEncoder.encodeHashBucket(bucketUpper, upper);
newPartitionKeyRanges.add(new Pair<>(lower, upper));
}
}
partitionKeyRanges = newPartitionKeyRanges;
}
// Step 3: append the (possibly empty) range bounds to the partition key ranges.
for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
range.getFirst().append(preliminaryRange.lower);
range.getSecond().append(preliminaryRange.upper);
}
// Step 4: Filter ranges that fall outside the scan's upper and lower bound partition keys.
for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
byte[] lower = range.getFirst().toArray();
byte[] upper = range.getSecond().toArray();
// Sanity check that the lower bound is less than the upper bound.
assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;
// Find the intersection of the ranges.
if (scanner.lowerBoundPartitionKey.length > 0 &&
(lower.length == 0 || Bytes.memcmp(lower, scanner.lowerBoundPartitionKey) < 0)) {
lower = scanner.lowerBoundPartitionKey;
}
if (scanner.upperBoundPartitionKey.length > 0 &&
(upper.length == 0 || Bytes.memcmp(upper, scanner.upperBoundPartitionKey) > 0)) {
upper = scanner.upperBoundPartitionKey;
}
// If the intersection is valid, then add it as a range partition.
if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
partitionKeyRangeBytes.add(new Pair<>(lower, upper));
}
}
}
// The PartitionPruner's constructor expects the collection to be sorted
// in ascending order.
Collections.sort(partitionKeyRangeBytes,
(lhs, rhs) -> Bytes.memcmp(lhs.getFirst(), rhs.getFirst()));
return new PartitionPruner(new ArrayDeque<>(partitionKeyRangeBytes));
}