public void flush()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java [310:411]


  public void flush() throws IOException {
    final String uniqueIdentifier = outputContext.getUniqueIdentifier();
    Path finalOutputFile =
        mapOutputFile.getOutputFileForWrite(0); //TODO
    Path finalIndexFile =
        mapOutputFile.getOutputIndexFileForWrite(0); //TODO

    LOG.info("Starting flush of map output");
    span.end();
    merger.add(span.sort(sorter, comparator));
    spill();
    sortmaster.shutdown();

    largeBuffer = null;

    if(numSpills == 1) {
      // someday be able to pass this directly to shuffle
      // without writing to disk
      final Path filename =
          mapOutputFile.getSpillFile(0);
      Path indexFilename =
              mapOutputFile.getSpillIndexFile(0);
      sameVolRename(filename, mapOutputFile.getOutputFileForWriteInVolume(filename));
      sameVolRename(indexFilename, mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename));
      return;
    }
    
    //The output stream for the final single output file
    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
    final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();

    for(int i = 0; i < numSpills; i++) {
      // TODO: build this cache before
      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
      TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
      indexCacheList.add(spillIndex);
    }
    
    for (int parts = 0; parts < partitions; parts++) {
      //create the segments to be merged
      List<Segment> segmentList =
          new ArrayList<Segment>(numSpills);
      for(int i = 0; i < numSpills; i++) {
        Path spillFilename = mapOutputFile.getSpillFile(i);
        TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

        Segment s =
            new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
                             indexRecord.getPartLength(), codec, ifileReadAhead,
                             ifileReadAheadLength, ifileBufferSize, true);
        segmentList.add(i, s);
      }

      int mergeFactor = 
              this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 
                  TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
      // sort the segments only if there are intermediate merges
      boolean sortSegments = segmentList.size() > mergeFactor;
      //merge
      TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
                     keyClass, valClass, codec,
                     segmentList, mergeFactor,
                     new Path(uniqueIdentifier),
                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
                     nullProgressable, sortSegments, true,
                     null, spilledRecordsCounter, null,
                     null); // Not using any Progress in TezMerger. Should just work.

      //write merged output to disk
      long segmentStart = finalOut.getPos();
      Writer writer =
          new Writer(conf, finalOut, keyClass, valClass, codec,
                           spilledRecordsCounter, null, merger.needsRLE());
      if (combiner == null || numSpills < minSpillsForCombine) {
        TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
      } else {
        runCombineProcessor(kvIter, writer);
      }

      //close
      writer.close();

      // record offsets
      final TezIndexRecord rec = 
          new TezIndexRecord(
              segmentStart, 
              writer.getRawLength(), 
              writer.getCompressedLength());
      spillRec.putIndex(rec, parts);
    }

    spillRec.writeToFile(finalIndexFile, conf);
    finalOut.close();
    for(int i = 0; i < numSpills; i++) {
      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
      Path spillFilename = mapOutputFile.getSpillFile(i);
      rfs.delete(indexFilename,true);
      rfs.delete(spillFilename,true);
    }
  }