protected void doOnMatch()

in exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java [165:482]


  protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {

    final String pruningClassName = getClass().getName();
    logger.debug("Beginning partition pruning, pruning class: {}", pruningClassName);
    Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;

    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
    PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
    final BufferAllocator allocator = optimizerContext.getAllocator();

    final Object selection = DrillRelOptUtil.getDrillTable(scanRel).getSelection();
    MetadataContext metaContext = null;
    if (selection instanceof FormatSelection) {
         metaContext = ((FormatSelection)selection).getSelection().getMetaContext();
    }

    RexNode condition;
    if (projectRel == null) {
      condition = filterRel.getCondition();
    } else {
      // get the filter as if it were below the projection.
      condition = RelOptUtil.pushPastProject(filterRel.getCondition(), projectRel);
    }

    RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
    condition = condition.accept(visitor);

    Map<Integer, String> fieldNameMap = new HashMap<>();
    List<String> fieldNames = scanRel.getRowType().getFieldNames();
    BitSet columnBitset = new BitSet();
    BitSet partitionColumnBitSet = new BitSet();
    Map<Integer, Integer> partitionMap = new HashMap<>();

    int relColIndex = 0;
    for (String field : fieldNames) {
      final Integer partitionIndex = descriptor.getIdIfValid(field);
      if (partitionIndex != null) {
        fieldNameMap.put(partitionIndex, field);
        partitionColumnBitSet.set(partitionIndex);
        columnBitset.set(relColIndex);
        // mapping between the relColIndex and partitionIndex
        partitionMap.put(relColIndex, partitionIndex);
      }
      relColIndex++;
    }

    if (partitionColumnBitSet.isEmpty()) {
      if (totalPruningTime != null) {
        logger.debug("No partition columns are projected from the scan..continue. Total pruning elapsed time: {} ms",
            totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
      }
      setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
      return;
    }

    // stop watch to track how long we spend in different phases of pruning
    // first track how long we spend building the filter tree
    Stopwatch miscTimer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;

    FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
    c.analyze(condition);
    RexNode pruneCondition = c.getFinalCondition();
    BitSet referencedDirsBitSet = c.getReferencedDirs();

    if (miscTimer != null) {
      logger.debug("Total elapsed time to build and analyze filter tree: {} ms", miscTimer.elapsed(TimeUnit.MILLISECONDS));
      miscTimer.reset();
    }

    if (pruneCondition == null) {
      if (totalPruningTime != null) {
        logger.debug("No conditions were found eligible for partition pruning. Total pruning elapsed time: {} ms",
            totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
      }
      setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
      return;
    }

    // set up the partitions
    List<PartitionLocation> newPartitions = new ArrayList<>();
    long numTotal = 0; // total number of partitions
    int batchIndex = 0;
    PartitionLocation firstLocation = null;
    LogicalExpression materializedExpr = null;
    String[] spInfo = null;
    int maxIndex = -1;
    BitSet matchBitSet = new BitSet();

    // Outer loop: iterate over a list of batches of PartitionLocations
    for (List<PartitionLocation> partitions : descriptor) {
      numTotal += partitions.size();
      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
      if (batchIndex == 0) { // save the first location in case everything is pruned
        firstLocation = partitions.get(0);
      }
      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
      final VectorContainer container = new VectorContainer();

      try {
        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
        for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
          // ParquetPartitionDescriptor.populatePruningVector() expects nullable value vectors,
          // so force nullability here to avoid class cast exceptions
          MajorType type = descriptor.getVectorType(column, settings).toBuilder().setMode(TypeProtos.DataMode.OPTIONAL).build();
          MaterializedField field = MaterializedField.create(column.getLastSegment().getNameSegment().getPath(), type);
          ValueVector v = TypeHelper.getNewVector(field, allocator);
          v.allocateNew();
          vectors[partitionColumnIndex] = v;
          container.add(v);
        }

        if (miscTimer != null) {
          // track how long we spend populating partition column vectors
          miscTimer.start();
        }

        // populate partition vectors.
        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);

        if (miscTimer != null) {
          logger.debug("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
              miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
          miscTimer.reset();
        }

        // materialize the expression; only need to do this once
        if (batchIndex == 0) {
          materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
          if (materializedExpr == null) {
            // continue without partition pruning; no need to log anything here since
            // materializePruneExpr logs it already
            if (totalPruningTime != null) {
              logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
            }
            setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
            return;
          }
        }

        output.allocateNew(partitions.size());

        if (miscTimer != null) {
          // start the timer to evaluate how long we spend in the interpreter evaluation
          miscTimer.start();
        }

        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);

        if (miscTimer != null) {
          logger.debug("Elapsed time in interpreter evaluation: {} ms within batchIndex: {} with # of partitions : {}",
              miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size());
          miscTimer.reset();
        }

        int recordCount = 0;
        int qualifiedCount = 0;

        if (descriptor.supportsMetadataCachePruning() &&
            partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) {
          // Inner loop: within each batch iterate over the PartitionLocations
          for (PartitionLocation part : partitions) {
            assert part.isCompositePartition();
            if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
              newPartitions.add(part);
              // Rather than using the PartitionLocation, get the array of partition values for the directories that are
              // referenced by the filter since we are not interested in directory references in other parts of the query.
              Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
              String[] parts = p.getLeft();
              int tmpIndex = p.getRight();
              maxIndex = Math.max(maxIndex, tmpIndex);
              if (spInfo == null) { // initialization
                spInfo = parts;
                for (int j = 0; j <= tmpIndex; j++) {
                  if (parts[j] != null) {
                    matchBitSet.set(j);
                  }
                }
              } else {
                // compare the new partition with existing partition
                for (int j=0; j <= tmpIndex; j++) {
                  if (parts[j] == null || spInfo[j] == null) { // nulls don't match
                    matchBitSet.clear(j);
                  } else {
                    if (!parts[j].equals(spInfo[j])) {
                      matchBitSet.clear(j);
                    }
                  }
                }
              }
              qualifiedCount++;
            }
            recordCount++;
          }
        } else {
          // Inner loop: within each batch iterate over the PartitionLocations
          for(PartitionLocation part: partitions){
            if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
              newPartitions.add(part);
              qualifiedCount++;
            }
            recordCount++;
          }
        }
        logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
        batchIndex++;
      } catch (Exception e) {
        logger.warn("Exception while trying to prune partition.", e);
        if (totalPruningTime != null) {
          logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
        }

        setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
        return; // continue without partition pruning
      } finally {
        container.clear();
        if (output != null) {
          output.clear();
        }
      }
    }

    try {
      if (newPartitions.size() == numTotal) {
        logger.debug("No partitions were eligible for pruning");
        return;
      }

      // handle the case all partitions are filtered out.
      boolean canDropFilter = true;
      boolean wasAllPartitionsPruned = false;
      Path cacheFileRoot = null;

      if (newPartitions.isEmpty()) {
        assert firstLocation != null;
        // Add the first non-composite partition location, since execution requires schema.
        // In such case, we should not drop filter.
        newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0));
        canDropFilter = false;
        // NOTE: with DRILL-4530, the PruneScanRule may be called with only a list of
        // directories first and the non-composite partition location will still return
        // directories, not files.  So, additional processing is done depending on this flag
        wasAllPartitionsPruned = true;
        logger.debug("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);

        // set the cacheFileRoot appropriately
        if (firstLocation.isCompositePartition()) {
          cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(), firstLocation.getCompositePartitionPath());
        }
      }

      logger.debug("Pruned {} partitions down to {}", numTotal, newPartitions.size());

      List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
      List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
      conjuncts.removeAll(pruneConjuncts);
      RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);

      RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder());

      condition = condition.accept(reverseVisitor);
      pruneCondition = pruneCondition.accept(reverseVisitor);

      if (descriptor.supportsMetadataCachePruning() && !wasAllPartitionsPruned) {
        // if metadata cache file could potentially be used, then assign a proper cacheFileRoot
        int index = -1;
        if (!matchBitSet.isEmpty()) {
          StringBuilder path = new StringBuilder();
          index = matchBitSet.length() - 1;

          for (int j = 0; j < matchBitSet.length(); j++) {
            if (!matchBitSet.get(j)) {
              // stop at the first index with no match and use the immediate
              // previous index
              index = j-1;
              break;
            }
          }
          for (int j = 0; j <= index; j++) {
            path.append("/")
                .append(spInfo[j]);
          }
          cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(),
              DrillFileSystemUtil.createPathSafe(path.toString()));
        }
        if (index != maxIndex) {
          // if multiple partitions are being selected, we should not drop the filter
          // since we are reading the cache file at a parent/ancestor level
          canDropFilter = false;
        }

      }

      RelNode inputRel = descriptor.supportsMetadataCachePruning() ?
          descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned, metaContext) :
            descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);

      if (projectRel != null) {
        inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
      }

      if (newCondition.isAlwaysTrue() && canDropFilter) {
        call.transformTo(inputRel);
      } else {
        final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
        call.transformTo(newFilter);
      }

      setPruneStatus(metaContext, PruneStatus.PRUNED);

    } catch (Exception e) {
      logger.warn("Exception while using the pruned partitions.", e);
    } finally {
      if (totalPruningTime != null) {
        logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
      }
    }
  }