private void mergeParts()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java [1210:1401]


  private void mergeParts() throws IOException, InterruptedException {
    // get the approximate size of the final output/index files
    long finalOutFileSize = 0;
    long finalIndexFileSize = 0;
    final Path[] filename = new Path[numSpills];
    final String taskIdentifier = outputContext.getUniqueIdentifier();

    for(int i = 0; i < numSpills; i++) {
      filename[i] = spillFilePaths.get(i);
      finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
    }
    if (numSpills == 1) { //the spill is the final output
      TezSpillRecord spillRecord = null;
      if (isFinalMergeEnabled()) {
        finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
        finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
        sameVolRename(filename[0], finalOutputFile);
        if (indexCacheList.size() == 0) {
          sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
          spillRecord = new TezSpillRecord(finalIndexFile, localFs);
        } else {
          spillRecord = indexCacheList.get(0);
          spillRecord.writeToFile(finalIndexFile, conf, localFs);
        }
      } else {
        List<Event> events = Lists.newLinkedList();
        //Since there is only one spill, spill record would be present in cache.
        spillRecord = indexCacheList.get(0);
        Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1, partitions *
            MAP_OUTPUT_INDEX_RECORD_LENGTH);
        spillRecord.writeToFile(indexPath, conf, localFs);
        maybeSendEventForSpill(events, true, spillRecord, 0, true);
        fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen());
        //No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled
      }
      if (spillRecord != null && reportPartitionStats()) {
        for(int i=0; i < spillRecord.size(); i++) {
          partitionStats[i] += spillRecord.getIndex(i).getRawLength();
        }
      }
      numShuffleChunks.setValue(numSpills);
      return;
    }

    // read in paged indices
    for (int i = indexCacheList.size(); i < numSpills; ++i) {
      Path indexFileName = spillFileIndexPaths.get(i);
      indexCacheList.add(new TezSpillRecord(indexFileName, localFs));
    }

    //Check if it is needed to do final merge. Or else, exit early.
    if (numSpills > 0 && !isFinalMergeEnabled()) {
      maybeAddEventsForSpills();
      //No need to do final merge.
      return;
    }

    //make correction in the length to include the sequence file header
    //lengths for each partition
    finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
    finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;

    if (isFinalMergeEnabled()) {
      finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
      finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
    } else if (numSpills == 0) {
      //e.g attempt_1424502260528_0119_1_07_000058_0_10012_0/file.out when final merge is
      // disabled
      finalOutputFile = mapOutputFile.getSpillFileForWrite(numSpills, finalOutFileSize);
      finalIndexFile = mapOutputFile.getSpillIndexFileForWrite(numSpills, finalIndexFileSize);
    }

    //The output stream for the final single output file
    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
    ensureSpillFilePermissions(finalOutputFile, rfs);

    if (numSpills == 0) {
      // TODO Change event generation to say there is no data rather than generating a dummy file
      //create dummy files
      long rawLength = 0;
      long partLength = 0;
      TezSpillRecord sr = new TezSpillRecord(partitions);
      try {
        for (int i = 0; i < partitions; i++) {
          long segmentStart = finalOut.getPos();
          if (!sendEmptyPartitionDetails) {
            Writer writer =
                new Writer(serializationContext.getKeySerialization(),
                    serializationContext.getValSerialization(), finalOut,
                    serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
                    null, null);
            writer.close();
            rawLength = writer.getRawLength();
            partLength = writer.getCompressedLength();
          }
          TezIndexRecord rec =
              new TezIndexRecord(segmentStart, rawLength, partLength);
          // Covers the case of multiple spills.
          outputBytesWithOverheadCounter.increment(rawLength);
          sr.putIndex(rec, i);
        }
        sr.writeToFile(finalIndexFile, conf, localFs);
      } finally {
        finalOut.close();
      }
      ++numSpills;
      if (!isFinalMergeEnabled()) {
        List<Event> events = Lists.newLinkedList();
        maybeSendEventForSpill(events, true, sr, 0, true);
        fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
      }
      numShuffleChunks.setValue(numSpills);
      return;
    }
    else {
      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
      for (int parts = 0; parts < partitions; parts++) {
        boolean shouldWrite = false;
        //create the segments to be merged
        List<Segment> segmentList =
            new ArrayList<Segment>(numSpills);
        for (int i = 0; i < numSpills; i++) {
          outputContext.notifyProgress();
          TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
          if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
            shouldWrite = true;
            DiskSegment s =
              new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(),
                               indexRecord.getPartLength(), codec, ifileReadAhead,
                               ifileReadAheadLength, ifileBufferSize, true);
            segmentList.add(s);
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug(outputContext.getInputOutputVertexNames() + ": "
                + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
                "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
                indexRecord.getRawLength() + ", " +
                indexRecord.getPartLength() + ")");
          }
        }

        int mergeFactor =
            this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
                TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
        // sort the segments only if there are intermediate merges
        boolean sortSegments = segmentList.size() > mergeFactor;
        //merge
        TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
                       serializationContext, codec,
                       segmentList, mergeFactor,
                       new Path(taskIdentifier),
                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
                       progressable, sortSegments, true,
                       null, spilledRecordsCounter, additionalSpillBytesRead,
                       null); // Not using any Progress in TezMerger. Should just work.

        //write merged output to disk
        long segmentStart = finalOut.getPos();
        long rawLength = 0;
        long partLength = 0;
        if (shouldWrite) {
          Writer writer = new Writer(serializationContext.getKeySerialization(),
              serializationContext.getValSerialization(), finalOut,
              serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
              spilledRecordsCounter, null);
        if (combiner == null || numSpills < minSpillsForCombine) {
          TezMerger.writeFile(kvIter, writer,
              progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
        } else {
          runCombineProcessor(kvIter, writer);
        }
        writer.close();
        rawLength = writer.getRawLength();
        partLength = writer.getCompressedLength();
      }
      outputBytesWithOverheadCounter.increment(rawLength);
      // record offsets
      final TezIndexRecord rec =
          new TezIndexRecord(segmentStart, rawLength, partLength);
        spillRec.putIndex(rec, parts);
        if (reportPartitionStats()) {
          partitionStats[parts] += rawLength;
        }
      }
      numShuffleChunks.setValue(1); //final merge has happened
      spillRec.writeToFile(finalIndexFile, conf, localFs);
      finalOut.close();
      for(int i = 0; i < numSpills; i++) {
        rfs.delete(filename[i],true);
      }
    }
  }