protected void reduce()

in indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java [646:878]


    protected void reduce(BytesWritable key, Iterable<BytesWritable> values, final Context context)
        throws IOException, InterruptedException
    {
      SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
      Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;

      final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();

      ListeningExecutorService persistExecutor = null;
      List<ListenableFuture<?>> persistFutures = new ArrayList<>();
      IncrementalIndex index = makeIncrementalIndex(
          bucket,
          combiningAggs,
          config,
          null,
          null
      );
      try {
        File baseFlushFile = FileUtils.createTempDir("base-flush");
        Set<File> toMerge = new TreeSet<>();
        int indexCount = 0;
        int lineCount = 0;
        int runningTotalLineCount = 0;
        long startTime = System.currentTimeMillis();

        Set<String> allDimensionNames = new LinkedHashSet<>();
        final ProgressIndicator progressIndicator = makeProgressIndicator(context);
        int numBackgroundPersistThreads = config.getSchema().getTuningConfig().getNumBackgroundPersistThreads();
        if (numBackgroundPersistThreads > 0) {
          final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
          ExecutorService executorService = new ThreadPoolExecutor(
              numBackgroundPersistThreads,
              numBackgroundPersistThreads,
              0L,
              TimeUnit.MILLISECONDS,
              queue,
              Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"),
              new RejectedExecutionHandler()
              {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
                {
                  try {
                    executor.getQueue().put(r);
                  }
                  catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RejectedExecutionException("Got Interrupted while adding to the Queue", e);
                  }
                }
              }
          );
          persistExecutor = MoreExecutors.listeningDecorator(executorService);
        } else {
          persistExecutor = Execs.directExecutor();
        }

        for (final BytesWritable bw : values) {
          context.progress();

          final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(typeHelperMap, bw.getBytes(), aggregators));
          int numRows = index.add(inputRow).getRowCount();

          ++lineCount;

          if (!index.canAppendRow()) {
            allDimensionNames.addAll(index.getDimensionNames(false));

            log.info(index.getOutOfRowsReason());
            log.info(
                "%,d lines to %,d rows in %,d millis",
                lineCount - runningTotalLineCount,
                numRows,
                System.currentTimeMillis() - startTime
            );
            runningTotalLineCount = lineCount;

            final File file = new File(baseFlushFile, StringUtils.format("index%,05d", indexCount));
            toMerge.add(file);

            context.progress();
            final IncrementalIndex persistIndex = index;
            persistFutures.add(
                persistExecutor.submit(
                    new ThreadRenamingRunnable(StringUtils.format("%s-persist", file.getName()))
                    {
                      @Override
                      public void doRun()
                      {
                        try {
                          persist(persistIndex, interval, file, progressIndicator);
                        }
                        catch (Exception e) {
                          log.error(e, "persist index error");
                          throw new RuntimeException(e);
                        }
                        finally {
                          // close this index
                          persistIndex.close();
                        }
                      }
                    }
                )
            );

            index = makeIncrementalIndex(
                bucket,
                combiningAggs,
                config,
                index.getDimensionOrder(),
                persistIndex.getColumnFormats()
            );
            startTime = System.currentTimeMillis();
            ++indexCount;
          }
        }

        allDimensionNames.addAll(index.getDimensionNames(false));

        log.info("%,d lines completed.", lineCount);

        List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
        final File mergedBase;

        if (toMerge.size() == 0) {
          if (index.isEmpty()) {
            throw new IAE("If you try to persist empty indexes you are going to have a bad time");
          }

          mergedBase = new File(baseFlushFile, "merged");
          persist(index, interval, mergedBase, progressIndicator);
        } else {
          if (!index.isEmpty()) {
            final File finalFile = new File(baseFlushFile, "final");
            persist(index, interval, finalFile, progressIndicator);
            toMerge.add(finalFile);
          }

          Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS);
          persistExecutor.shutdown();

          for (File file : toMerge) {
            indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
          }

          log.info("starting merge of intermediate persisted segments.");
          long mergeStartTime = System.currentTimeMillis();
          mergedBase = mergeQueryableIndex(indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator);
          log.info(
              "finished merge of intermediate persisted segments. time taken [%d] ms.",
              (System.currentTimeMillis() - mergeStartTime)
          );
        }
        final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
            .getFileSystem(context.getConfiguration());

        // ShardSpec used for partitioning within this Hadoop job.
        final ShardSpec shardSpecForPartitioning = config.getShardSpec(bucket).getActualSpec();

        // ShardSpec to be published.
        final ShardSpec shardSpecForPublishing;
        if (config.isForceExtendableShardSpecs()) {
          shardSpecForPublishing = new NumberedShardSpec(
              shardSpecForPartitioning.getPartitionNum(),
              config.getShardSpecCount(bucket)
          );
        } else {
          shardSpecForPublishing = shardSpecForPartitioning;
        }

        final DataSegment segmentTemplate = new DataSegment(
            config.getDataSource(),
            interval,
            config.getSchema().getTuningConfig().getVersion(),
            null,
            ImmutableList.copyOf(allDimensionNames),
            metricNames,
            shardSpecForPublishing,
            -1,
            0
        );

        final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(
            segmentTemplate,
            context.getConfiguration(),
            context,
            mergedBase,
            JobHelper.makeFileNamePath(
                new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
                outputFS,
                segmentTemplate,
                JobHelper.INDEX_ZIP,
                HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
            ),
            JobHelper.makeTmpPath(
                new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
                outputFS,
                segmentTemplate,
                context.getTaskAttemptID(),
                HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
            ),
            HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
        );

        Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
        descriptorPath = JobHelper.prependFSIfNullScheme(
            FileSystem.get(
                descriptorPath.toUri(),
                context.getConfiguration()
            ), descriptorPath
        );

        log.info("Writing descriptor to path[%s]", descriptorPath);
        JobHelper.writeSegmentDescriptor(
            config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
            segmentAndIndexZipFilePath,
            descriptorPath,
            context
        );
        for (File file : toMerge) {
          FileUtils.deleteDirectory(file);
        }
      }
      catch (ExecutionException | TimeoutException e) {
        throw new RuntimeException(e);
      }
      finally {
        index.close();
        if (persistExecutor != null) {
          persistExecutor.shutdownNow();
        }
      }
    }