protected void innerReduce()

in indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java [697:933]


    protected void innerReduce(Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable)
        throws IOException
    {
      final ByteBuffer groupKey = ByteBuffer.wrap(keyBytes.getGroupKey());
      groupKey.position(4); // Skip partition
      final DateTime bucket = DateTimes.utc(groupKey.getLong());
      final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());

      log.info(
          "Determining partitions for interval: %s",
          config.getGranularitySpec().bucketInterval(bucket).orNull()
      );

      // First DVC should be the total row count indicator
      final DimValueCount firstDvc = iterator.next();
      final long totalRows = firstDvc.numRows;

      if (!firstDvc.dims.isEmpty() || firstDvc.values.size() != 0) {
        throw new IllegalStateException("Expected total row indicator on first k/v pair");
      }

      // "iterator" will now take us over many candidate dimensions
      DimPartitions currentDimPartitions = null;
      DimPartition currentDimPartition = null;
      StringTuple currentDimPartitionStart = null;
      boolean currentDimSkip = false;

      // We'll store possible partitions in here
      final Map<List<String>, DimPartitions> dimPartitionss = new HashMap<>();
      final DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
      final DeterminePartitionsJobSampler sampler = createSampler(config);

      while (iterator.hasNext()) {
        final DimValueCount dvc = iterator.next();

        if (currentDimPartitions == null || !currentDimPartitions.dims.equals(dvc.dims)) {
          // Starting a new dimension! Exciting!
          currentDimPartitions = new DimPartitions(dvc.dims);
          currentDimPartition = new DimPartition();
          currentDimPartitionStart = null;
          currentDimSkip = false;
        }

        // Respect poisoning
        if (!currentDimSkip && dvc.numRows < 0) {
          log.info("Cannot partition on multi-value dimension: %s", dvc.dims);
          currentDimSkip = true;
        }

        if (currentDimSkip) {
          continue;
        }

        // See if we need to cut a new partition ending immediately before this dimension value
        if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > sampler.getSampledTargetPartitionSize()) {
          final ShardSpec shardSpec = createShardSpec(
              partitionsSpec instanceof SingleDimensionPartitionsSpec,
              currentDimPartitions.dims,
              currentDimPartitionStart,
              dvc.values,
              currentDimPartitions.partitions.size(),
              // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
              // completeness. See SingleDimensionShardSpec.createChunk().
              SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
          );

          log.info(
              "Adding possible shard with %,d rows and %,d unique values: %s",
              currentDimPartition.rows,
              currentDimPartition.cardinality,
              shardSpec
          );

          currentDimPartition.shardSpec = shardSpec;
          currentDimPartitions.partitions.add(currentDimPartition);
          currentDimPartition = new DimPartition();
          currentDimPartitionStart = dvc.values;
        }

        // Update counters
        currentDimPartition.cardinality++;
        currentDimPartition.rows += dvc.numRows;

        if (!iterator.hasNext() || !currentDimPartitions.dims.equals(iterator.peek().dims)) {
          // Finalize the current dimension

          if (currentDimPartition.rows > 0) {
            // One more shard to go
            final ShardSpec shardSpec;

            if (currentDimPartition.rows < sampler.getSampledTargetPartitionSize() * SHARD_COMBINE_THRESHOLD &&
                !currentDimPartitions.partitions.isEmpty()) {
              // Combine with previous shard if it exists and the current shard is small enough
              final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
                  currentDimPartitions.partitions.size() - 1
              );

              final DimensionRangeShardSpec previousShardSpec = (DimensionRangeShardSpec) previousDimPartition.shardSpec;

              shardSpec = createShardSpec(
                  partitionsSpec instanceof SingleDimensionPartitionsSpec,
                  currentDimPartitions.dims,
                  previousShardSpec.getStartTuple(),
                  null,
                  previousShardSpec.getPartitionNum(),
                  // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
                  // completeness. See SingleDimensionShardSpec.createChunk().
                  SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
              );

              log.info("Removing possible shard: %s", previousShardSpec);

              currentDimPartition.rows += previousDimPartition.rows;
              currentDimPartition.cardinality += previousDimPartition.cardinality;
            } else {
              // Create new shard
              shardSpec = createShardSpec(
                  partitionsSpec instanceof SingleDimensionPartitionsSpec,
                  currentDimPartitions.dims,
                  currentDimPartitionStart,
                  null,
                  currentDimPartitions.partitions.size(),
                  // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
                  // completeness. See SingleDimensionShardSpec.createChunk().
                  SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
              );
            }

            log.info(
                "Adding possible shard with %,d rows and %,d unique values: %s",
                currentDimPartition.rows,
                currentDimPartition.cardinality,
                shardSpec
            );

            currentDimPartition.shardSpec = shardSpec;
            currentDimPartitions.partitions.add(currentDimPartition);
          }

          log.info(
              "Completed dimension[%s]: %,d possible shards with %,d unique values",
              currentDimPartitions.dims,
              currentDimPartitions.partitions.size(),
              currentDimPartitions.getCardinality()
          );

          // Add ourselves to the partitions map
          dimPartitionss.put(currentDimPartitions.dims, currentDimPartitions);
        }
      }

      // Choose best dimension
      if (dimPartitionss.isEmpty()) {
        throw new ISE("No suitable partitioning dimension found!");
      }

      int maxCardinality = Integer.MIN_VALUE;
      long minDistance = Long.MAX_VALUE;
      DimPartitions minDistancePartitions = null;
      DimPartitions maxCardinalityPartitions = null;

      for (final DimPartitions dimPartitions : dimPartitionss.values()) {
        if (dimPartitions.getRows() != totalRows) {
          log.info(
              "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
              dimPartitions.dims,
              dimPartitions.getRows(),
              totalRows
          );

          continue;
        }

        // Make sure none of these shards are oversized
        boolean oversized = false;
        for (final DimPartition partition : dimPartitions.partitions) {
          if (partition.rows > sampler.getSampledMaxRowsPerSegment()) {
            log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec);
            oversized = true;
          }
        }

        if (oversized) {
          continue;
        }

        final int cardinality = dimPartitions.getCardinality();
        final long distance = dimPartitions.getDistanceSquaredFromTarget(sampler.getSampledTargetPartitionSize());

        if (cardinality > maxCardinality) {
          maxCardinality = cardinality;
          maxCardinalityPartitions = dimPartitions;
        }

        if (distance < minDistance) {
          minDistance = distance;
          minDistancePartitions = dimPartitions;
        }
      }

      if (maxCardinalityPartitions == null) {
        throw new ISE("No suitable partitioning dimension found!");
      }

      final OutputStream out = Utils.makePathAndOutputStream(
          context,
          config.makeSegmentPartitionInfoPath(config.getGranularitySpec().bucketInterval(bucket).get()),
          config.isOverwriteFiles()
      );

      final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD
                                             ? maxCardinalityPartitions
                                             : minDistancePartitions;

      final List<ShardSpec> chosenShardSpecs = Lists.transform(
          chosenPartitions.partitions,
          dimPartition -> dimPartition.shardSpec
      );

      log.info("Chosen partitions:");
      for (ShardSpec shardSpec : chosenShardSpecs) {
        log.info("  %s", HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(shardSpec));
      }

      try {
        HadoopDruidIndexerConfig.JSON_MAPPER
            .writerFor(
                new TypeReference<List<ShardSpec>>()
                {
                }
            )
            .writeValue(out, chosenShardSpecs);
      }
      finally {
        Closeables.close(out, false);
      }
    }