private TezRawKeyValueIterator finalMerge()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java [1152:1318]


  private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
                                       List<MapOutput> inMemoryMapOutputs,
                                       List<FileChunk> onDiskMapOutputs
                                       ) throws IOException, InterruptedException {

    logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs);
    StringBuilder finalMergeLog = new StringBuilder();

    inputContext.notifyProgress();

    // merge config params
    SerializationContext serContext = new SerializationContext(job);
    final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
    final RawComparator comparator =
      (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);

    // segments required to vacate memory
    List<Segment> memDiskSegments = new ArrayList<Segment>();
    long inMemToDiskBytes = 0;
    boolean mergePhaseFinished = false;
    if (inMemoryMapOutputs.size() > 0) {
      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier();
      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
                                                memDiskSegments,
                                                this.postMergeMemLimit);
      final int numMemDiskSegments = memDiskSegments.size();
      if (numMemDiskSegments > 0 &&
            ioSortFactor > onDiskMapOutputs.size()) {

        // If we reach here, it implies that we have less than io.sort.factor
        // disk segments and this will be incremented by 1 (result of the
        // memory segments merge). Since this total would still be
        // <= io.sort.factor, we will not do any more intermediate merges,
        // the merge of all these disk segments would be directly fed to the
        // reduce method

        mergePhaseFinished = true;
        // must spill to disk, but can't retain in-mem for intermediate merge
        // Can not use spill id in final merge as it would clobber with other files, hence using
        // Integer.MAX_VALUE
        final Path outputPath =
          mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
              inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, serContext,
            memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
            spilledRecordsCounter, null, additionalBytesRead, null);
        final Writer writer = new Writer(serContext.getKeySerialization(),
            serContext.getValSerialization(), fs, outputPath, serContext.getKeyClass(),
            serContext.getValueClass(), codec, null, null);
        try {
          TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
        } catch (IOException e) {
          if (null != outputPath) {
            try {
              fs.delete(outputPath, true);
            } catch (IOException ie) {
              // NOTHING
            }
          }
          throw e;
        } finally {
          if (null != writer) {
            writer.close();
            additionalBytesWritten.increment(writer.getCompressedLength());
          }
        }

        final FileStatus fStatus = localFS.getFileStatus(outputPath);
        // add to list of final disk outputs.
        onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen()));

        if (LOG.isInfoEnabled()) {
          finalMergeLog.append("MemMerged: " + numMemDiskSegments + ", " + inMemToDiskBytes);
          if (LOG.isDebugEnabled()) {
            LOG.debug("Merged " + numMemDiskSegments + "segments, size=" +
                inMemToDiskBytes + " to " + outputPath);
          }
        }

        inMemToDiskBytes = 0;
        memDiskSegments.clear();
      } else if (inMemToDiskBytes != 0) {
        if (LOG.isInfoEnabled()) {
          finalMergeLog.append("DelayedMemMerge: " + numMemDiskSegments + ", " + inMemToDiskBytes);
          if (LOG.isDebugEnabled()) {
            LOG.debug("Keeping " + numMemDiskSegments + " segments, " +
                inMemToDiskBytes + " bytes in memory for " +
                "intermediate, on-disk merge");
          }
        }
      }
    }

    // segments on disk
    List<Segment> diskSegments = new ArrayList<Segment>();
    long onDiskBytes = inMemToDiskBytes;
    FileChunk[] onDisk = onDiskMapOutputs.toArray(new FileChunk[onDiskMapOutputs.size()]);
    for (FileChunk fileChunk : onDisk) {
      final long fileLength = fileChunk.getLength();
      onDiskBytes += fileLength;
      if (LOG.isDebugEnabled()) {
        LOG.debug("Disk file=" + fileChunk.getPath() + ", len=" + fileLength +
            ", isLocal=" +
            fileChunk.isLocalFile());
      }

      final Path file = fileChunk.getPath();
      TezCounter counter =
          file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;

      final long fileOffset = fileChunk.getOffset();
      final boolean preserve = fileChunk.isLocalFile();
      diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, codec, ifileReadAhead,
                                   ifileReadAheadLength, ifileBufferSize, preserve, counter));
    }
    if (LOG.isInfoEnabled()) {
      finalMergeLog.append(". DiskSeg: " + onDisk.length + ", " + onDiskBytes);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Merging " + onDisk.length + " files, " +
            onDiskBytes + " bytes from disk");
      }
    }
    Collections.sort(diskSegments, new Comparator<Segment>() {
      public int compare(Segment o1, Segment o2) {
        if (o1.getLength() == o2.getLength()) {
          return 0;
        }
        return o1.getLength() < o2.getLength() ? -1 : 1;
      }
    });

    // build final list of segments from merged backed by disk + in-mem
    List<Segment> finalSegments = new ArrayList<Segment>();
    long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
                                             finalSegments, 0);
    if (LOG.isInfoEnabled()) {
      finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Merging " + finalSegments.size() + " segments, " +
            inMemBytes + " bytes from memory into reduce");
      }
    }

    if (0 != onDiskBytes) {
      final int numInMemSegments = memDiskSegments.size();
      diskSegments.addAll(0, memDiskSegments);
      memDiskSegments.clear();
      TezRawKeyValueIterator diskMerge = TezMerger.merge(
          job, fs, serContext, codec, diskSegments,
          ioSortFactor, numInMemSegments, tmpDir, comparator,
          progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
      diskSegments.clear();
      if (0 == finalSegments.size()) {
        return diskMerge;
      }
      finalSegments.add(new Segment(
            new RawKVIteratorReader(diskMerge, onDiskBytes), null));
    }
    if (LOG.isInfoEnabled()) {
      LOG.info(finalMergeLog.toString());
    }
    // This is doing nothing but creating an iterator over the segments.
    return TezMerger.merge(job, fs, serContext, codec,
        finalSegments, finalSegments.size(), tmpDir,
        comparator, progressable, spilledRecordsCounter, null,
        additionalBytesRead, null);
  }