public static PartitionPruner create()

in java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java [73:282]


  public static PartitionPruner create(AbstractKuduScannerBuilder<?, ?> scanner) {
    Schema schema = scanner.table.getSchema();
    final PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
    PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema();
    Map<String, KuduPredicate> predicates = scanner.predicates;

    // Check if the scan can be short-circuited entirely by checking the primary
    // key bounds and predicates. This also allows us to assume some invariants of the
    // scan, such as no None predicates and that the lower bound PK < upper
    // bound PK.
    if (scanner.upperBoundPrimaryKey.length > 0 &&
        Bytes.memcmp(scanner.lowerBoundPrimaryKey, scanner.upperBoundPrimaryKey) >= 0) {
      return PartitionPruner.empty();
    }
    for (KuduPredicate predicate : predicates.values()) {
      if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
        return PartitionPruner.empty();
      }
    }

    // Build a set of partition key ranges which cover the tablets necessary for
    // the scan.
    //
    // Example predicate sets and resulting partition key ranges, based on the
    // following tablet schema:
    //
    // CREATE TABLE t (a INT32, b INT32, c INT32) PRIMARY KEY (a, b, c)
    // DISTRIBUTE BY RANGE (c)
    //               HASH (a) INTO 2 BUCKETS
    //               HASH (b) INTO 3 BUCKETS;
    //
    // Assume that hash(0) = 0 and hash(2) = 2.
    //
    // | Predicates | Partition Key Ranges                                   |
    // +------------+--------------------------------------------------------+
    // | a = 0      | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
    // | b = 2      |                                                        |
    // | c = 0      |                                                        |
    // +------------+--------------------------------------------------------+
    // | a = 0      | [(bucket=0, bucket=2), (bucket=0, bucket=3))           |
    // | b = 2      |                                                        |
    // +------------+--------------------------------------------------------+
    // | a = 0      | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
    // | c = 0      | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
    // |            | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
    // +------------+--------------------------------------------------------+
    // | b = 2      | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
    // | c = 0      | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
    // +------------+--------------------------------------------------------+
    // | a = 0      | [(bucket=0), (bucket=1))                               |
    // +------------+--------------------------------------------------------+
    // | b = 2      | [(bucket=0, bucket=2), (bucket=0, bucket=3))           |
    // |            | [(bucket=1, bucket=2), (bucket=1, bucket=3))           |
    // +------------+--------------------------------------------------------+
    // | c = 0      | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
    // |            | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
    // |            | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
    // |            | [(bucket=1, bucket=0, c=0), (bucket=1, bucket=0, c=1)) |
    // |            | [(bucket=1, bucket=1, c=0), (bucket=1, bucket=1, c=1)) |
    // |            | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
    // +------------+--------------------------------------------------------+
    // | None       | [(), ())                                               |
    //
    // If the partition key is considered as a sequence of the hash bucket
    // components and a range component, then a few patterns emerge from the
    // examples above:
    //
    // 1) The partition keys are truncated after the final constrained component
    //    Hash bucket components are constrained when the scan is limited to a
    //    subset of buckets via equality or in-list predicates on that component.
    //    Range components are constrained if they have an upper or lower bound
    //    via range or equality predicates on that component.
    //
    // 2) If the final constrained component is a hash bucket, then the
    //    corresponding bucket in the upper bound is incremented in order to make
    //    it an exclusive key.
    //
    // 3) The number of partition key ranges in the result is equal to the product
    //    of the number of buckets of each unconstrained hash component which come
    //    before a final constrained component. If there are no unconstrained hash
    //    components, then the number of resulting partition key ranges is one. Note
    //    that this can be a lot of ranges, and we may find we need to limit the
    //    algorithm to give up on pruning if the number of ranges exceeds a limit.
    //    Until this becomes a problem in practice, we'll continue always pruning,
    //    since it is precisely these highly-hash-partitioned tables which get the
    //    most benefit from pruning.

    // Step 1: Build the range portion of the partition key. If the range partition
    // columns match the primary key columns, then we can substitute the primary
    // key bounds, if they are tighter.
    byte[] rangeLowerBound = pushPredsIntoLowerBoundRangeKey(schema, rangeSchema, predicates);
    byte[] rangeUpperBound = pushPredsIntoUpperBoundRangeKey(schema, rangeSchema, predicates);

    if (partitionSchema.isSimpleRangePartitioning()) {
      if (Bytes.memcmp(rangeLowerBound, scanner.lowerBoundPrimaryKey) < 0) {
        rangeLowerBound = scanner.lowerBoundPrimaryKey;
      }
      if (scanner.upperBoundPrimaryKey.length > 0 &&
          (rangeUpperBound.length == 0 ||
           Bytes.memcmp(rangeUpperBound, scanner.upperBoundPrimaryKey) > 0)) {
        rangeUpperBound = scanner.upperBoundPrimaryKey;
      }
    }
    // Since the table can contain range-specific hash schemas, it's necessary
    // to split the original range into sub-ranges where each subrange comes
    // with appropriate hash schema.
    List<PartitionSchema.EncodedRangeBoundsWithHashSchema> preliminaryRanges =
        splitIntoHashSpecificRanges(rangeLowerBound, rangeUpperBound, partitionSchema);

    List<Pair<byte[], byte[]>> partitionKeyRangeBytes = new ArrayList<>();

    for (PartitionSchema.EncodedRangeBoundsWithHashSchema preliminaryRange : preliminaryRanges) {
      // Step 2: Create the hash bucket portion of the partition key.
      final List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
          preliminaryRange.hashSchemas;
      // List of pruned hash buckets per hash component.
      List<BitSet> hashComponents = new ArrayList<>(hashBucketSchemas.size());
      for (PartitionSchema.HashBucketSchema hashSchema : hashBucketSchemas) {
        hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
      }

      // The index of the final constrained component in the partition key.
      int constrainedIndex = 0;
      if (preliminaryRange.lower.length > 0 || preliminaryRange.upper.length > 0) {
        // The range component is constrained if either of the range bounds are
        // specified (non-empty).
        constrainedIndex = hashBucketSchemas.size();
      } else {
        // Search the hash bucket constraints from right to left, looking for the
        // first constrained component.
        for (int i = hashComponents.size(); i > 0; i--) {
          int numBuckets = hashBucketSchemas.get(i - 1).getNumBuckets();
          BitSet hashBuckets = hashComponents.get(i - 1);
          if (hashBuckets.nextClearBit(0) < numBuckets) {
            constrainedIndex = i;
            break;
          }
        }
      }

      // Build up a set of partition key ranges out of the hash components.
      //
      // Each hash component simply appends its bucket number to the
      // partition key ranges (possibly incrementing the upper bound by one bucket
      // number if this is the final constraint, see note 2 in the example above).
      List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
      partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));

      for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
        // This is the final partition key component if this is the final constrained
        // bucket, and the range upper bound is empty. In this case we need to
        // increment the bucket on the upper bound to convert from inclusive to
        // exclusive.
        boolean isLast = hashIdx + 1 == constrainedIndex && preliminaryRange.upper.length == 0;
        BitSet hashBuckets = hashComponents.get(hashIdx);

        List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
            new ArrayList<>(partitionKeyRanges.size() * hashBuckets.cardinality());
        for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
          for (int bucket = hashBuckets.nextSetBit(0);
               bucket != -1;
               bucket = hashBuckets.nextSetBit(bucket + 1)) {
            int bucketUpper = isLast ? bucket + 1 : bucket;
            ByteVec lower = partitionKeyRange.getFirst().clone();
            ByteVec upper = partitionKeyRange.getFirst().clone();
            KeyEncoder.encodeHashBucket(bucket, lower);
            KeyEncoder.encodeHashBucket(bucketUpper, upper);
            newPartitionKeyRanges.add(new Pair<>(lower, upper));
          }
        }
        partitionKeyRanges = newPartitionKeyRanges;
      }

      // Step 3: append the (possibly empty) range bounds to the partition key ranges.
      for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
        range.getFirst().append(preliminaryRange.lower);
        range.getSecond().append(preliminaryRange.upper);
      }

      // Step 4: Filter ranges that fall outside the scan's upper and lower bound partition keys.
      for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
        byte[] lower = range.getFirst().toArray();
        byte[] upper = range.getSecond().toArray();

        // Sanity check that the lower bound is less than the upper bound.
        assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;

        // Find the intersection of the ranges.
        if (scanner.lowerBoundPartitionKey.length > 0 &&
            (lower.length == 0 || Bytes.memcmp(lower, scanner.lowerBoundPartitionKey) < 0)) {
          lower = scanner.lowerBoundPartitionKey;
        }
        if (scanner.upperBoundPartitionKey.length > 0 &&
            (upper.length == 0 || Bytes.memcmp(upper, scanner.upperBoundPartitionKey) > 0)) {
          upper = scanner.upperBoundPartitionKey;
        }

        // If the intersection is valid, then add it as a range partition.
        if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
          partitionKeyRangeBytes.add(new Pair<>(lower, upper));
        }
      }
    }

    // The PartitionPruner's constructor expects the collection to be sorted
    // in ascending order.
    Collections.sort(partitionKeyRangeBytes,
        (lhs, rhs) -> Bytes.memcmp(lhs.getFirst(), rhs.getFirst()));
    return new PartitionPruner(new ArrayDeque<>(partitionKeyRangeBytes));
  }