public void flush()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java [665:860]


  public void flush() throws IOException {
    final String uniqueIdentifier = outputContext.getUniqueIdentifier();

    outputContext.notifyProgress();
    /**
     * Possible that the thread got interrupted when flush was happening or when the flush was
     * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
     * on all I/O. At that time, this is safe to cleanup
     */
    if (isThreadInterrupted()) {
      return;
    }

    try {
      LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output");
      span.end();
      merger.add(span.sort(sorter));
      // force a spill in flush()
      // case 1: we want to force because of following scenarios:
      // we have no keys written, and flush got called
      // we want atleast one spill(be it empty)
      // case 2: in pipeline shuffle case, we have no way of
      // knowing the last key being written until flush is called
      // so for flush()->spill() we want to force spill so that
      // we can send pipeline shuffle event with last event true.
      spill(false);
      sortmaster.shutdown();

      //safe to clean up
      buffers.clear();


      if(indexCacheList.isEmpty()) {
        /*
         * If we do not have this check, and if the task gets killed in the middle, it can throw
         * NPE leading to distraction when debugging.
         */
        if (LOG.isDebugEnabled()) {
          LOG.debug(outputContext.getInputOutputVertexNames()
              + ": Index list is empty... returning");
        }
        return;
      }

      if (!isFinalMergeEnabled()) {

        //For pipelined shuffle, previous events are already sent. Just generate the last event alone
        int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
        int endIndex = numSpills;

        for (int i = startIndex; i < endIndex; i++) {
          boolean isLastEvent = (i == numSpills - 1);
          String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
          ShuffleUtils.generateEventOnSpill(finalEvents, isFinalMergeEnabled(), isLastEvent,
              outputContext, i, indexCacheList.get(i), partitions,
              sendEmptyPartitionDetails, pathComponent, partitionStats,
              reportDetailedPartitionStats(), auxiliaryService, deflater);
          LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update="
              + isLastEvent + "), spillId=" + i);
        }
        return;
      }

      numAdditionalSpills.increment(numSpills - 1);

      //In case final merge is required, the following code path is executed.
      if (numSpills == 1) {
        // someday be able to pass this directly to shuffle
        // without writing to disk
        final Path filename = spillFilePaths.get(0);
        final Path indexFilename = spillFileIndexPaths.get(0);
        finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
        finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);

        sameVolRename(filename, finalOutputFile);
        sameVolRename(indexFilename, finalIndexFile);
        if (LOG.isDebugEnabled()) {
          LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills +
              ", finalOutputFile=" + finalOutputFile + ", "
              + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
              indexFilename);
        }
        TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs);
        if (reportPartitionStats()) {
          for (int i = 0; i < spillRecord.size(); i++) {
            partitionStats[i] += spillRecord.getIndex(i).getRawLength();
          }
        }
        numShuffleChunks.setValue(numSpills);
        fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
        // ??? why are events not being sent here?
        return;
      }

      finalOutputFile =
          mapOutputFile.getOutputFileForWrite(0); //TODO
      finalIndexFile =
          mapOutputFile.getOutputIndexFileForWrite(0); //TODO

      if (LOG.isDebugEnabled()) {
        LOG.debug(outputContext.getInputOutputVertexNames() + ": " +
            "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
                + finalIndexFile);
      }
      //The output stream for the final single output file
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
      ensureSpillFilePermissions(finalOutputFile, rfs);

      final TezSpillRecord spillRec = new TezSpillRecord(partitions);

      for (int parts = 0; parts < partitions; parts++) {
        boolean shouldWrite = false;
        //create the segments to be merged
        List<Segment> segmentList =
            new ArrayList<Segment>(numSpills);
        for (int i = 0; i < numSpills; i++) {
          Path spillFilename = spillFilePaths.get(i);
          TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
          if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
            shouldWrite = true;
            DiskSegment s =
                new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(),
                    indexRecord.getPartLength(), codec, ifileReadAhead,
                    ifileReadAheadLength, ifileBufferSize, true);
            segmentList.add(s);
          }
        }

        int mergeFactor =
            this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
                TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
        // sort the segments only if there are intermediate merges
        boolean sortSegments = segmentList.size() > mergeFactor;
        //merge
        TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
            serializationContext, codec,
            segmentList, mergeFactor,
            new Path(uniqueIdentifier),
            (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
            progressable, sortSegments, true,
            null, spilledRecordsCounter, additionalSpillBytesRead,
            null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just work.
        //write merged output to disk
        long segmentStart = finalOut.getPos();
        long rawLength = 0;
        long partLength = 0;
        if (shouldWrite) {
          Writer writer = new Writer(serializationContext.getKeySerialization(),
              serializationContext.getValSerialization(), finalOut,
              serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
              spilledRecordsCounter, null, merger.needsRLE());
          if (combiner == null || numSpills < minSpillsForCombine) {
            TezMerger.writeFile(kvIter, writer, progressable,
                TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
          } else {
            runCombineProcessor(kvIter, writer);
          }

          //close
          writer.close();
          rawLength = writer.getRawLength();
          partLength = writer.getCompressedLength();
        }
        outputBytesWithOverheadCounter.increment(rawLength);

        // record offsets
        final TezIndexRecord rec =
            new TezIndexRecord(segmentStart, rawLength, partLength);
        spillRec.putIndex(rec, parts);
        if (reportPartitionStats()) {
          partitionStats[parts] += rawLength;
        }
      }

      numShuffleChunks.setValue(1); //final merge has happened.
      fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());

      spillRec.writeToFile(finalIndexFile, conf, localFs);
      finalOut.close();
      for (int i = 0; i < numSpills; i++) {
        Path indexFilename = spillFileIndexPaths.get(i);
        Path spillFilename = spillFilePaths.get(i);
        rfs.delete(indexFilename, true);
        rfs.delete(spillFilename, true);
      }

      spillFileIndexPaths.clear();
      spillFilePaths.clear();
    } catch(InterruptedException ie) {
      if (cleanup) {
        cleanup();
      }
      Thread.currentThread().interrupt();
      throw new IOInterruptedException("Interrupted while closing Output", ie);
    }
  }